RabbitMQ之(三)多種消息模型實戰
標簽: rabbitMQ
在RabbitMQ的核心組件交換機中有4種典型的消息模式,即HeadersExchange,FanoutExchange,DirectExchange ,TopicExchange的四種模式,生產環境中,常用后面三種。本節將講述下面幾種
基于FanoutExchange的消息模型實戰
FanoutExchange具有廣播的作用,當消息進入這個中轉站的時候,交換機會檢查哪個隊列跟自己綁定一起的,找到相應的隊列后,由隊列對應的消費者進行監聽消費。
就是一對多的原理
還是在RabbitMQConfig下添加交換機,路由
package com.learn.boot.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.validation.BindingResult;
/** zlx
* RabbitMQ自定義注入配置Bean相關組件
*/
@Configuration
public class RabbitmqConfig {
private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);
// 自動設置RabbitMQ的連接工廠實例
@Autowired
private CachingConnectionFactory connectionFactory;
// 自動設置消息監聽器所在的容器工廠配置類實例
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 下面為單一消費者實例的配置
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
// 定義消息監聽器所在的容器工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 設置容器工廠所用的實例
factory.setConnectionFactory(connectionFactory);
// 設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// 設置并發消費者實例的初始數量,在這里為1個
factory.setConcurrentConsumers(1);
// 設置并發消費者實例的最大數量,在這里為1個
factory.setMaxConcurrentConsumers(1);
// 設置并發消費者實例中每個實例拉取的消息數量,在這里為1個
factory.setPrefetchCount(1);
return factory;
}
/**
*下面為多個消費者實例的配置,主要是針對高并發業務場景的配置
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
// 定義消息監聽器所在的容器工廠
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 設置容器工廠所用的實例
factoryConfigurer.configure(factory,connectionFactory);
// 設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// 設置消息的確認消費模式,在這里為NONE,表示不需要確認消費
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
// 設置并發消費者實例的初始數量,在這里為10個
factory.setConcurrentConsumers(10);
// 設置并發消費者實例的最大數量,在這里為15個
factory.setMaxConcurrentConsumers(15);
// 設置并發消費者實例中每個實例拉取的消息數量,在這里為10個
factory.setPrefetchCount(10);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
// 設置“發送消息后返回確認信息”
connectionFactory.setPublisherReturns(true);
// 構造發送消息組件實例對象
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 這里是為了消息確認和手動ACK
rabbitTemplate.setMandatory(true);
// 設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 發送消息后,如果發送成功,則輸出“消息發送成功”的反饋信息
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息發送成功:correlationData({}),ack({}), cause({})", correlationData,ack,cause);
}
});
// 發送消息后,如果發送失敗,則輸出“消息丟失”的反饋信息
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丟失:exchange({}),route({}),replyCode ({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,
message);
}
});
// 最終返回RabbitMQ的操作組件實例RabbitTemplate
return rabbitTemplate;
}
// 創建隊列
@Bean(name = "basicQueue")
public Queue basicQueue() {
/*
// durable :是否持久化(宕機以后重新發布)
// exclusive : 是否排外,當前隊列只能被一個消費者消費
// autoDelete 如果這個隊列沒有消費者,隊列是否被刪除
// arguments 指定當前隊列的其他信息
*/
return new Queue("basicQueue",true,false,false,null);
}
@Bean
public DirectExchange basicExchange(){
// 跟創建隊列一樣
return new DirectExchange("basicExchange",true,false);
}
@Bean
public Binding basicBinding() {
// 綁定路由key
return BindingBuilder.bind(basicQueue()).to(basicExchange()).with("basicExchange-basicQueue-key");
}
/**創建消息模型FanoutExchange ------------------------------------------------------------------------------**/
/** FanoutExchange 交換機
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange",true,false);
}
//創建隊列1
@Bean(name = "fanoutQueueOne")
public Queue fanoutQueueOne(){
return new Queue("fanoutQueueOne",true,false,false,null);
}
//創建隊列2
@Bean(name = "fanoutQueueTwo")
public Queue fanoutQueueTwo(){
return new Queue("fanoutQueueTwo",true,false,false,null);
}
//創建綁定1
@Bean
public Binding fanoutBindingOne(){
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
//創建綁定2
@Bean
public Binding fanoutBindingTwo(){
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
}
@RequestMapping("/rabbitTestFanoutExchange")
public ResultVo rabbitTestFanoutExchange(@RequestBody User user) {
// 注意這里不需要指定路由key了,綁定交換機就行
rabbitTemplate.convertAndSend("fanoutExchange", "", user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//獲取消息的屬性
MessageProperties messageProperties = message.getMessageProperties();
//設置消息的持久化模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//設置消息的類型(在這里指定消息類型為Person類型)
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
//返回消息實例
return message;
}
});
log.info("生產者發送對象消息{}",user);
return ResultVo.success("生產者發送信息成功");
}
@RabbitListener(queues = "fanoutQueueOne",containerFactory = "singleListenerContainer")
public void receiveFanoutResultOne(@Payload User user){
try {
log.info("基本消息模型-消費者1-監聽消費消息:{} ",user);
}catch (Exception e){
log.error("基本消息模型-消費者1-發生異常:",e.fillInStackTrace());
}
}
@RabbitListener(queues = "fanoutQueueTwo",containerFactory = "singleListenerContainer")
public void receiveFanoutResultTwo(@Payload User user){
try {
log.info("消息模型-消費者2-監聽消費消息:{} ",user);
}catch (Exception e){
log.error("基本消息模型-消費者2-發生異常:",e.fillInStackTrace());
}
}
這種模式適用于業務數據需要廣播式的傳播,比如用戶操作寫日志,將用戶操作的日志封裝成實體類,并將其序列化
基于DirectExchange的消息模型實戰
這是最正規的消息模式,這個必須交換機和路由綁定,才能把消息發送到隊列。
這應該是RabbitMQ最簡單的消息模型,在上面的基礎上,綁定一個固有的路由key就行了
/**創建消息模型directExchange----------------------------------------------------------------- **/
//創建交換機directExchange
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange",true,false);
}
//創建隊列1
@Bean(name = "directQueueOne")
public Queue directQueueOne(){
return new Queue("directQueueOne",true);
}
//創建隊列2
@Bean(name = "directQueueTwo")
public Queue directQueueTwo(){
return new Queue("directQueueOne",true);
}
//創建綁定1
@Bean
public Binding directBindingOne(){
return BindingBuilder.bind(directQueueOne()).to(directExchange()).
with("directExchange-key-one");
}
//創建綁定2
@Bean
public Binding directBindingTwo(){
return BindingBuilder.bind(directQueueTwo()).to(directExchange()).
with("directExchange-key-two");
}
@RequestMapping("/rabbitTestDirectExchange")
public ResultVo rabbitTestDirectExchange(@RequestBody User user) {
// 注意這里不需要指定路由key了,綁定交換機就行
rabbitTemplate.convertAndSend("directExchange", "directExchange-key-one", user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//獲取消息的屬性
MessageProperties messageProperties = message.getMessageProperties();
//設置消息的持久化模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//設置消息的類型(在這里指定消息類型為User類型)
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
//返回消息實例
return message;
}
});
log.info("生產者發送對象消息{}",user);
return ResultVo.success("生產者發送信息成功");
}
@RequestMapping("/rabbitTestDirectExchangeTwo")
public ResultVo rabbitTestDirectExchangeTwo(@RequestBody User user) {
// 注意這里不需要指定路由key了,綁定交換機就行
rabbitTemplate.convertAndSend("directExchange", "directExchange-key-two", user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//獲取消息的屬性
MessageProperties messageProperties = message.getMessageProperties();
//設置消息的持久化模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//設置消息的類型(在這里指定消息類型為User類型)
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
//返回消息實例
return message;
}
});
log.info("生產者發送對象消息{}",user);
return ResultVo.success("生產者發送信息成功");
}
@RabbitListener(queues = "directQueueOne",containerFactory = "singleListenerContainer")
public void receiveDirectQueueOne(@Payload User user){
try {
log.info("direct消息模型-消費者1-監聽消費消息:{} ",user);
}catch (Exception e){
log.error("direct消息模型-消費者1-發生異常:",e.fillInStackTrace());
}
}
@RabbitListener(queues = "directQueueTwo",containerFactory = "singleListenerContainer")
public void receiveDirectQueueTwo(@Payload User user){
try {
log.info("direct消息模型-消費者2-監聽消費消息:{} ",user);
}catch (Exception e){
log.error("direct消息模型-消費者2-發生異常:",e.fillInStackTrace());
}
}
指定路由key:directExchange-key-one
指定路由key:directExchange-key-two
基于TopicExchange的消息模型實戰
/**創建消息模型topicExchange **/
//創建交換機topicExchange
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange", true,false);
}
//創建隊列1
@Bean(name = "topicQueueOne")
public Queue topicQueueOne(){
return new Queue("topicQueueOne",true);
}
//創建隊列2
@Bean(name = "topicQueueTwo")
public Queue topicQueueTwo(){
return new Queue("topicQueueTwo",true);
}
//創建綁定,含通配符*的路由
@Bean
public Binding topicBindingOne(){
return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).
with("*.red.*");
}
//創建綁定,含通配符#的路由
@Bean
public Binding topicBindingTwo(){
return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).
with("fast.#");
}
@RequestMapping("/rabbitTestTopicExchange1")
public ResultVo rabbitTestRabbitTestTopicExchangeOne(@RequestBody User user) {
// 這里使用通配符
rabbitTemplate.convertAndSend("topicExchange", "slow.red.dog", user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//獲取消息的屬性
MessageProperties messageProperties = message.getMessageProperties();
//設置消息的持久化模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//設置消息的類型(在這里指定消息類型為User類型)
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
//返回消息實例
return message;
}
});
log.info("生產者發送對象消息{}",user);
return ResultVo.success("生產者發送信息成功");
}
@RequestMapping("/rabbitTestTopicExchange2")
public ResultVo rabbitTestRabbitTestTopicExchangeTwo(@RequestBody User user) {
// 這里使用通配符#
rabbitTemplate.convertAndSend("topicExchange", "fast.red.monkey", user, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//獲取消息的屬性
MessageProperties messageProperties = message.getMessageProperties();
//設置消息的持久化模式
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//設置消息的類型(在這里指定消息類型為User類型)
messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
//返回消息實例
return message;
}
});
log.info("生產者發送對象消息{}",user);
return ResultVo.success("生產者發送信息成功");
}
@RabbitListener(queues = "topicQueueOne",containerFactory = "singleListenerContainer")
public void receiveTopicQueueOne(@Payload User user){
try {
log.info("topic消息模型-消費者topicQueueOne-監聽消費消息:{} ",user);
}catch (Exception e){
log.error("topic消息模型-消費者topicQueueOne-發生異常:",e.fillInStackTrace());
}
}
@RabbitListener(queues = "topicQueueTwo",containerFactory = "singleListenerContainer")
public void receiveTopicQueueTwo(@Payload User user){
try {
log.info("topic消息模型-消費者topicQueueTwo-監聽消費消息:{} ",user);
}catch (Exception e){
log.error("topic消息模型-消費者topicQueueTwo-發生異常:",e.fillInStackTrace());
}
}
這種消息模型,最大的特點就是支持通配路由,以*和#作為通配符,從而綁定不同的隊列,*表示一個特定的單詞,而#表示任意
slow.red.dog
fast.red.monkey
智能推薦
電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!
Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...
requests實現全自動PPT模板
http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...
Linux C系統編程-線程互斥鎖(四)
互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...
統計學習方法 - 樸素貝葉斯
引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...
猜你喜歡
styled-components —— React 中的 CSS 最佳實踐
https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...
19.vue中封裝echarts組件
19.vue中封裝echarts組件 1.效果圖 2.echarts組件 3.使用組件 按照組件格式整理好數據格式 傳入組件 home.vue 4.接口返回數據格式...
【一只蒟蒻的刷題歷程】【藍橋杯】歷屆試題 九宮重排 (八數碼問題:BFS+集合set)
資源限制 時間限制:1.0s 內存限制:256.0MB 問題描述 如下面第一個圖的九宮格中,放著 1~8 的數字卡片,還有一個格子空著。與空格子相鄰的格子中的卡片可以移動到空格中。經過若干次移動,可以形成第二個圖所示的局面。 我們把第一個圖的局面記為:12345678. 把第二個圖的局面記為:123.46758 顯然是按從上到下,從左到右的順序記錄數字,空格記為句點。 本題目的任務是已知九宮的初態...