• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • RabbitMQ之(三)多種消息模型實戰

    標簽: rabbitMQ


    在RabbitMQ的核心組件交換機中有4種典型的消息模式,即HeadersExchange,FanoutExchange,DirectExchange ,TopicExchange的四種模式,生產環境中,常用后面三種。本節將講述下面幾種

    基于FanoutExchange的消息模型實戰

    在這里插入圖片描述
    在這里插入圖片描述

    FanoutExchange具有廣播的作用,當消息進入這個中轉站的時候,交換機會檢查哪個隊列跟自己綁定一起的,找到相應的隊列后,由隊列對應的消費者進行監聽消費。
    就是一對多的原理
    還是在RabbitMQConfig下添加交換機,路由

    package com.learn.boot.config;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.validation.BindingResult;
    
    /** zlx
     *   RabbitMQ自定義注入配置Bean相關組件
     */
    @Configuration
    public class RabbitmqConfig {
        private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);
    
        // 自動設置RabbitMQ的連接工廠實例
        @Autowired
        private CachingConnectionFactory connectionFactory;
        // 自動設置消息監聽器所在的容器工廠配置類實例
        @Autowired
        private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
    
    
        /**
         * 下面為單一消費者實例的配置
         * @return
         */
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainer(){
            // 定義消息監聽器所在的容器工廠
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            // 設置容器工廠所用的實例
            factory.setConnectionFactory(connectionFactory);
            // 設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            // 設置并發消費者實例的初始數量,在這里為1個
            factory.setConcurrentConsumers(1);
            // 設置并發消費者實例的最大數量,在這里為1個
            factory.setMaxConcurrentConsumers(1);
            // 設置并發消費者實例中每個實例拉取的消息數量,在這里為1個
            factory.setPrefetchCount(1);
            return factory;
        }
    
        /**
         *下面為多個消費者實例的配置,主要是針對高并發業務場景的配置
         * @return
         */
        @Bean(name = "multiListenerContainer")
        public SimpleRabbitListenerContainerFactory multiListenerContainer(){
            // 定義消息監聽器所在的容器工廠
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            // 設置容器工廠所用的實例
            factoryConfigurer.configure(factory,connectionFactory);
            // 設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            // 設置消息的確認消費模式,在這里為NONE,表示不需要確認消費
            factory.setAcknowledgeMode(AcknowledgeMode.NONE);
            // 設置并發消費者實例的初始數量,在這里為10個
            factory.setConcurrentConsumers(10);
            // 設置并發消費者實例的最大數量,在這里為15個
            factory.setMaxConcurrentConsumers(15);
            // 設置并發消費者實例中每個實例拉取的消息數量,在這里為10個
            factory.setPrefetchCount(10);
            return factory;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(){
            // 設置“發送消息后返回確認信息”
            connectionFactory.setPublisherReturns(true);
            // 構造發送消息組件實例對象
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            // 這里是為了消息確認和手動ACK
            rabbitTemplate.setMandatory(true);
            // 設置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            // 發送消息后,如果發送成功,則輸出“消息發送成功”的反饋信息
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("消息發送成功:correlationData({}),ack({}), cause({})", correlationData,ack,cause);
                }
            });
            // 發送消息后,如果發送失敗,則輸出“消息丟失”的反饋信息
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.info("消息丟失:exchange({}),route({}),replyCode ({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,
                    message);
                }
            });
            // 最終返回RabbitMQ的操作組件實例RabbitTemplate
            return rabbitTemplate;
        }
    
    
        // 創建隊列
        @Bean(name = "basicQueue")
        public Queue basicQueue() {
        /*
            // durable :是否持久化(宕機以后重新發布)
            // exclusive : 是否排外,當前隊列只能被一個消費者消費
            // autoDelete 如果這個隊列沒有消費者,隊列是否被刪除
            // arguments 指定當前隊列的其他信息
            */
            return new Queue("basicQueue",true,false,false,null);
        }
    
        @Bean
        public DirectExchange basicExchange(){
            // 跟創建隊列一樣
            return new DirectExchange("basicExchange",true,false);
        }
    
        @Bean
        public Binding basicBinding() {
            // 綁定路由key
            return BindingBuilder.bind(basicQueue()).to(basicExchange()).with("basicExchange-basicQueue-key");
    
        }
    
        /**創建消息模型FanoutExchange ------------------------------------------------------------------------------**/
    
        /** FanoutExchange 交換機
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange",true,false);
        }
    
        //創建隊列1
        @Bean(name = "fanoutQueueOne")
        public Queue fanoutQueueOne(){
            return new Queue("fanoutQueueOne",true,false,false,null);
        }
    
        //創建隊列2
        @Bean(name = "fanoutQueueTwo")
        public Queue fanoutQueueTwo(){
            return new Queue("fanoutQueueTwo",true,false,false,null);
        }
        //創建綁定1
        @Bean
        public Binding fanoutBindingOne(){
            return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
        }
        //創建綁定2
        @Bean
        public Binding fanoutBindingTwo(){
            return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
        }
    }
    
    
        @RequestMapping("/rabbitTestFanoutExchange")
        public ResultVo rabbitTestFanoutExchange(@RequestBody User user) {
            // 注意這里不需要指定路由key了,綁定交換機就行
            rabbitTemplate.convertAndSend("fanoutExchange", "", user, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //獲取消息的屬性
                    MessageProperties messageProperties = message.getMessageProperties();
                    //設置消息的持久化模式
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //設置消息的類型(在這里指定消息類型為Person類型)
                    messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
                    //返回消息實例
                    return message;
                }
            });
            log.info("生產者發送對象消息{}",user);
            return ResultVo.success("生產者發送信息成功");
        }
    
        @RabbitListener(queues = "fanoutQueueOne",containerFactory = "singleListenerContainer")
        public void receiveFanoutResultOne(@Payload User user){
            try {
                log.info("基本消息模型-消費者1-監聽消費消息:{} ",user);
            }catch (Exception e){
                log.error("基本消息模型-消費者1-發生異常:",e.fillInStackTrace());
            }
        }
    
        @RabbitListener(queues = "fanoutQueueTwo",containerFactory = "singleListenerContainer")
        public void receiveFanoutResultTwo(@Payload User user){
            try {
                log.info("消息模型-消費者2-監聽消費消息:{} ",user);
            }catch (Exception e){
                log.error("基本消息模型-消費者2-發生異常:",e.fillInStackTrace());
            }
        }
    

    在這里插入圖片描述
    這種模式適用于業務數據需要廣播式的傳播,比如用戶操作寫日志,將用戶操作的日志封裝成實體類,并將其序列化

    基于DirectExchange的消息模型實戰

    這是最正規的消息模式,這個必須交換機和路由綁定,才能把消息發送到隊列。
    在這里插入圖片描述
    這應該是RabbitMQ最簡單的消息模型,在上面的基礎上,綁定一個固有的路由key就行了

        /**創建消息模型directExchange----------------------------------------------------------------- **/
        //創建交換機directExchange
        @Bean
        public DirectExchange directExchange(){
            return new DirectExchange("directExchange",true,false);
        }
        //創建隊列1
        @Bean(name = "directQueueOne")
        public Queue directQueueOne(){
            return new Queue("directQueueOne",true);
        }
        //創建隊列2
        @Bean(name = "directQueueTwo")
        public Queue directQueueTwo(){
            return new Queue("directQueueOne",true);
        }
        //創建綁定1
        @Bean
        public Binding directBindingOne(){
            return BindingBuilder.bind(directQueueOne()).to(directExchange()).
                    with("directExchange-key-one");
        }
        //創建綁定2
        @Bean
        public Binding directBindingTwo(){
            return BindingBuilder.bind(directQueueTwo()).to(directExchange()).
                    with("directExchange-key-two");
        }
    
      @RequestMapping("/rabbitTestDirectExchange")
        public ResultVo rabbitTestDirectExchange(@RequestBody User user) {
            // 注意這里不需要指定路由key了,綁定交換機就行
            rabbitTemplate.convertAndSend("directExchange", "directExchange-key-one", user, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //獲取消息的屬性
                    MessageProperties messageProperties = message.getMessageProperties();
                    //設置消息的持久化模式
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //設置消息的類型(在這里指定消息類型為User類型)
                    messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
                    //返回消息實例
                    return message;
                }
            });
            log.info("生產者發送對象消息{}",user);
            return ResultVo.success("生產者發送信息成功");
        }
    
        @RequestMapping("/rabbitTestDirectExchangeTwo")
        public ResultVo rabbitTestDirectExchangeTwo(@RequestBody User user) {
            // 注意這里不需要指定路由key了,綁定交換機就行
            rabbitTemplate.convertAndSend("directExchange", "directExchange-key-two", user, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //獲取消息的屬性
                    MessageProperties messageProperties = message.getMessageProperties();
                    //設置消息的持久化模式
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //設置消息的類型(在這里指定消息類型為User類型)
                    messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
                    //返回消息實例
                    return message;
                }
            });
            log.info("生產者發送對象消息{}",user);
            return ResultVo.success("生產者發送信息成功");
        }
    
     @RabbitListener(queues = "directQueueOne",containerFactory = "singleListenerContainer")
        public void receiveDirectQueueOne(@Payload User user){
            try {
                log.info("direct消息模型-消費者1-監聽消費消息:{} ",user);
            }catch (Exception e){
                log.error("direct消息模型-消費者1-發生異常:",e.fillInStackTrace());
            }
        }
    
        @RabbitListener(queues = "directQueueTwo",containerFactory = "singleListenerContainer")
        public void receiveDirectQueueTwo(@Payload User user){
            try {
                log.info("direct消息模型-消費者2-監聽消費消息:{} ",user);
            }catch (Exception e){
                log.error("direct消息模型-消費者2-發生異常:",e.fillInStackTrace());
            }
        }
    

    指定路由key:directExchange-key-one
    在這里插入圖片描述
    指定路由key:directExchange-key-two
    在這里插入圖片描述

    基于TopicExchange的消息模型實戰

    在這里插入圖片描述

    
        /**創建消息模型topicExchange **/
    //創建交換機topicExchange
        @Bean
        public TopicExchange topicExchange(){
            return new TopicExchange("topicExchange", true,false);
        }
        //創建隊列1
        @Bean(name = "topicQueueOne")
        public Queue topicQueueOne(){
            return new Queue("topicQueueOne",true);
        }
        //創建隊列2
        @Bean(name = "topicQueueTwo")
        public Queue topicQueueTwo(){
            return new Queue("topicQueueTwo",true);
        }
        //創建綁定,含通配符*的路由
        @Bean
        public Binding topicBindingOne(){
            return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).
                    with("*.red.*");
        }
        //創建綁定,含通配符#的路由
        @Bean
        public Binding topicBindingTwo(){
            return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).
                    with("fast.#");
        }
    
       @RequestMapping("/rabbitTestTopicExchange1")
        public ResultVo rabbitTestRabbitTestTopicExchangeOne(@RequestBody User user) {
            // 這里使用通配符
            rabbitTemplate.convertAndSend("topicExchange", "slow.red.dog", user, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //獲取消息的屬性
                    MessageProperties messageProperties = message.getMessageProperties();
                    //設置消息的持久化模式
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //設置消息的類型(在這里指定消息類型為User類型)
                    messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
                    //返回消息實例
                    return message;
                }
            });
            log.info("生產者發送對象消息{}",user);
            return ResultVo.success("生產者發送信息成功");
        }
    
        @RequestMapping("/rabbitTestTopicExchange2")
        public ResultVo rabbitTestRabbitTestTopicExchangeTwo(@RequestBody User user) {
            // 這里使用通配符#
            rabbitTemplate.convertAndSend("topicExchange", "fast.red.monkey", user, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //獲取消息的屬性
                    MessageProperties messageProperties = message.getMessageProperties();
                    //設置消息的持久化模式
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    //設置消息的類型(在這里指定消息類型為User類型)
                    messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,User.class);
                    //返回消息實例
                    return message;
                }
            });
            log.info("生產者發送對象消息{}",user);
            return ResultVo.success("生產者發送信息成功");
        }
    
        @RabbitListener(queues = "topicQueueOne",containerFactory = "singleListenerContainer")
        public void receiveTopicQueueOne(@Payload User user){
            try {
                log.info("topic消息模型-消費者topicQueueOne-監聽消費消息:{} ",user);
            }catch (Exception e){
                log.error("topic消息模型-消費者topicQueueOne-發生異常:",e.fillInStackTrace());
            }
        }
    
    
        @RabbitListener(queues = "topicQueueTwo",containerFactory = "singleListenerContainer")
        public void receiveTopicQueueTwo(@Payload User user){
            try {
                log.info("topic消息模型-消費者topicQueueTwo-監聽消費消息:{} ",user);
            }catch (Exception e){
                log.error("topic消息模型-消費者topicQueueTwo-發生異常:",e.fillInStackTrace());
            }
        }
    

    這種消息模型,最大的特點就是支持通配路由,以*和#作為通配符,從而綁定不同的隊列,*表示一個特定的單詞,而#表示任意
    slow.red.dog
    在這里插入圖片描述

    fast.red.monkey
    在這里插入圖片描述

    版權聲明:本文為weixin_42324034原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/weixin_42324034/article/details/108405644

    智能推薦

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    統計學習方法 - 樸素貝葉斯

    引入問題:一機器在良好狀態生產合格產品幾率是 90%,在故障狀態生產合格產品幾率是 30%,機器良好的概率是 75%。若一日第一件產品是合格品,那么此日機器良好的概率是多少。 貝葉斯模型 生成模型與判別模型 判別模型,即要判斷這個東西到底是哪一類,也就是要求y,那就用給定的x去預測。 生成模型,是要生成一個模型,那就是誰根據什么生成了模型,誰就是類別y,根據的內容就是x 以上述例子,判斷一個生產出...

    猜你喜歡

    styled-components —— React 中的 CSS 最佳實踐

    https://zhuanlan.zhihu.com/p/29344146 Styled-components 是目前 React 樣式方案中最受關注的一種,它既具備了 css-in-js 的模塊化與參數化優點,又完全使用CSS的書寫習慣,不會引起額外的學習成本。本文是 styled-components 作者之一 Max Stoiber 所寫,首先總結了前端組件化樣式中的最佳實踐原則,然后在此基...

    基于TCP/IP的網絡聊天室用Java來實現

    基于TCP/IP的網絡聊天室實現 開發工具:eclipse 開發環境:jdk1.8 發送端 接收端 工具類 運行截圖...

    19.vue中封裝echarts組件

    19.vue中封裝echarts組件 1.效果圖 2.echarts組件 3.使用組件 按照組件格式整理好數據格式 傳入組件 home.vue 4.接口返回數據格式...

    劍指Offer39-調整數組順序使奇數位于偶數前面

    一開始想著用冒泡排序的方法來做,但是bug還是很多,后來看了評論區答案,發現直接空間換時間是最簡單的,而且和快排的寫法是類似的。...

    【一只蒟蒻的刷題歷程】【藍橋杯】歷屆試題 九宮重排 (八數碼問題:BFS+集合set)

    資源限制 時間限制:1.0s 內存限制:256.0MB 問題描述 如下面第一個圖的九宮格中,放著 1~8 的數字卡片,還有一個格子空著。與空格子相鄰的格子中的卡片可以移動到空格中。經過若干次移動,可以形成第二個圖所示的局面。 我們把第一個圖的局面記為:12345678. 把第二個圖的局面記為:123.46758 顯然是按從上到下,從左到右的順序記錄數字,空格記為句點。 本題目的任務是已知九宮的初態...

    精品国产乱码久久久久久蜜桃不卡