• <noscript id="e0iig"><kbd id="e0iig"></kbd></noscript>
  • <td id="e0iig"></td>
  • <option id="e0iig"></option>
  • <noscript id="e0iig"><source id="e0iig"></source></noscript>
  • SpringCloud學習筆記-SpringCloud Bus、SpringCloudStream

    標簽: springcloud

    springcloudConfig 手動刷新的問題:
    假如有多個客戶端3344、3366、3377
    每個微服務都要執行一次post請求,手動刷新
    可否廣播,一次通知,處處生效?

    分布式自動刷新配置功能
    SpringCloud Bus 配合 SpringCloud Config可以實現配置的動態刷新

    1、SpringCloud Bus 消息總線

    1.1 基本概念

    Bus支持兩種消息代理:RabbitMQ 和 Kafka

    SpringCloud Bus 是用來將分布式系統的節點與輕量級消息系統連接起來的框架,它整合了Java的時間處理機制和消息中間件的功能。

    SpringCloud Bus能管理和傳播分布式系統間的消息,就像一個分布式執行器,可用于廣播狀態更改,時間推送等,也可以當做微服務之間的通信通道

    什么是總線:
    在微服務架構的系統中,通常會使用輕量級的消息代理來構建一個消息主題,并讓系統中的所有微服務實例都連接上來。由于該主題中產生的消息會被所有實例監聽和消費,所以稱它為消息總線。 在總線上的各個實例,都可以方便的廣播一些需要讓其他連接載該主題上的實例都知道的消息。

    基本原理
    ConfigClient 實例都監聽MQ 中的同一個Topic默認是(SpringCloud Bus)他會把這個信息放入Topic中,這樣其他監聽同一個Topic的服務就能得到通知,然后取更新自身的配置。

    1.2 RabbitMQ環境配置

    1. 安裝Erlang
    2. 安裝RabbitMQ
    3. 進入RabbitMQ下的bin目錄
    4. 輸入一下命令啟動安裝功能 rabbit-plugins enable rabbitmq_management
    5. 可視化插件
    6. http://localhost:15672/
    7. 默認賬號密碼 guest guest

    1.3 Bus動態刷新全局廣播通知

    制作3366
    cloud-config-client-3366
    3355也添加這個依賴

      <!--添加消息總線rabbitMQ支持-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-bus-amqp</artifactId>
            </dependency>
    

    bootstrap.yml

    server:
      port: 3366
    
    spring:
      application:
        name: config-client
      cloud:
        #Config客戶端配置
        config:
          label: master #分支名稱
          name: config #配置文件名稱
          profile: dev #讀取后綴名稱 上述3個綜合:master分支上config-dev.yml的配置文件被讀取 http://config-3344.com:3344/master/config-dev.yml
          uri: http://localhost:3344 #配置中心地址
      #rabbit相關配置 15672是web管理界面的端口,5672是MQ訪問的端口
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
    #服務注冊到eureka地址
    eureka:
      client:
        service-url:
          defaultZone: http://localhost:7001/eureka
    
    #暴露監控端點
    management:
      endpoints:
        web:
          exposure:
            include: "*"
    

    主啟動類

    @SpringBootApplication
    @EnableEurekaClient
    public class ConfigClientMain3366 {
        public static void main(String[] args) {
            SpringApplication.run(ConfigClientMain3366.class,args);
        }
    }
    
    

    ConfigClientController

    @RestController
    @RefreshScope
    public class ConfigClientController {
        @Value("${server.port}")
        private String serverPort;
        @Value(("${config.info}"))
        private String configInfo;
    
        @GetMapping("/configInfo")
        public String getConfigInfo(){
            return "serverPort: "+serverPort+"\t\n\n configInfo: "+configInfo;
        }
    }
    
    1. 利用消息總線觸發一個客戶端/bus/refresh,而刷新所有客戶端配置
      1. 破壞了微服務職責的單一性,因為微服務本身就是業務模塊,它本不應該承擔刷新的職責
      2. 破壞了微服務節點的對等性
      3. 有一定局限性,例如,微服務在遷移的時候,它的網絡地址常常發生變化,此時如果想要做到自動刷新,那就會增加更多的修改
    2. 利用消息總線觸發一個服務端ConfigServer的bus/refresh端點,而刷新所有客戶端配置
      在這里插入圖片描述

    在這里插入圖片描述

    1.3.1 3344 3355 3366 添加消息總線

     <!--添加消息總線RbbitMQ支持-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-bus-amqp</artifactId>
            </dependency>
    

    yml

    #rabbitmq相關配置,暴露bus刷新配置的端點
    management:
      endpoints:  #暴露bus刷新配置的端點
        web:
          exposure:
            include: 'bus-refresh'  #凡是暴露監控、刷新的都要有actuator依賴,bus-refresh就是actuator的刷新操作
    

    3355

     <!--添加消息總線rabbitMQ支持-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-bus-amqp</artifactId>
            </dependency>
    

    yml

    #rabbit相關配置 15672是web管理界面的端口,5672是MQ訪問的端口
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
    

    3366

     <!--添加消息總線rabbitMQ支持-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-bus-amqp</artifactId>
            </dependency>
    
    #rabbit相關配置 15672是web管理界面的端口,5672是MQ訪問的端口
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
    

    1.3.2 測試

    一次修改,廣播通知,出處生效

    1. 在github上修改配置文件
    2. 發送post請求 cur -X POST "http://localhost:3344/actuator/bus-refresh" (向3344發post請求)
    3. 刷新3355、3366 localhost3366/configInfo localhost3355/configInfo

    1.4 定點通知

    只通知3355 3366
    指定某一個具體實例生效
    公式 :

    http://localhost:配置中心的端口號/actuator/bus-refresh/{destination}
    

    /bus/refresh請求不再發送到具體的服務實例上。

    cur -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
    

    2、SpringCloud Stream 消息驅動

    2.1 基本概念

    解決的痛點:(MQ消息中間件)

    1. ActiveMQ
    2. RabbitMQ
    3. RocketMQ
    4. kafka

    系統可能會同時存在兩種RabbitMQ 和 kafka

    舉例:對于我們Java程序員來說,可能有時要使用ActiveMQ,有時要使用RabbitMQ,甚至還有RocketMQ以及Kafka,這之間的切換似乎很麻煩,我們很難,也沒有太多時間去精通每一門技術,那有沒有一種新技術的誕生,讓我們不再關注具體MQ的細節,自動的給我們在各種MQ內切換。

    有沒有一種新的結束誕生,讓我們不再關注具體的MQ的細節,我們只需要用一種適配綁定的方式,在各種MQ之間切換

    cloud stream : 屏蔽底層消息中間件的差異,統一消息的編程模型

    SpringCloud Stream是一個構建消息驅動微服務的框架

    應用程序通過inputs 或者outputs 來與SpringCloud Stream 中的 binder對象交互
    通過我們來binding,而Springcloud Stream的binder對象負責與消息中間件交互
    所以我們只需要搞清楚如何與Springcloud Stream交互就可以黨鞭的使用消息驅動的方式

    通過spring Integeration來連接消息代理中間件以實現消息時間驅動
    Springcloud Stream為一些供應商的消息中間件產品提供了葛新華的自動化配置實現,引用了發布訂閱、消費組、分區三個概念

    目前僅支持RabbitMQ kafka

    2. 2 設計思想

    標準MQ :
    在這里插入圖片描述

    1. 生產者和消費者之間通過消息媒介傳遞信息內容
    2. 消息必須走特定的通道

    為什么要用SpringCloud ?
    比方說我們用到了RabbitMQ和Kafka,由于這兩公分消息中間件的架構上的不同
    像RabbitMQ有exchange,kafka有Topic 和 Partitions分區
    在這里插入圖片描述
    在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由于各消息中間件構建的初衷不同,他們的實現細節上會有較大的差異性
    通過定義綁定器作為中間層,完美的實現了應用程序與消息中間細節之間的隔離
    通過向應用程序暴露統一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件的差異

    通過定義綁定器Binder作為中間層,實現了應用程序和消息中間件的細節之間的隔離。

    Binder :
    input 消費者
    output 生產者

    在這里插入圖片描述

    Stream的通信方式遵循了發布訂閱模式 Topic 主題進行分區廣播

    1. 在RabbitMQ就是Exchange
    2. 在kafka中就是Topic

    2.3 編碼API和常用注解

    在這里插入圖片描述
    Binder : 很方便的連接中間件,屏蔽差異
    Channel: 通道是隊列Queue的一種抽象,在消息通訊系統中實現存儲和轉發的媒介,通過channel 對隊列進行配置
    Source 和 Sink: 消息的生產者和消費者

    @Inout

    2.4 生產者

    cloud-stream-rabbitmq-provider8801

    1.pom

    <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    

    2.appliaction.yml

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          binders: # 在此處配置要綁定的rabbitMQ的服務信息
          defaultRabbit: # 表示定義的名稱,用于binding的整合
            type: rabbit # 消息中間件類型
            environment: # 設置rabbitMQ的相關環境配置
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
          bindings: # 服務的整合處理
            ouput: # 這個名字是一個通道的名稱
              destination: studyExchange # 表示要使用的exchange名稱定義
              content-type: application/json # 設置消息類型,本次為json,文本則設為text/plain
              binder: defaultRabbit # 設置要綁定的消息服務的具體設置
    
    eureka:
      client:
        service-url:
          defaultZone: http://eureka7001.com:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 設置心跳的間隔時間,默認30
        lease-expiration-duration-in-seconds: 5 # 超過5秒間隔,默認90
        instance-id: send-8801.com # 主機名
        prefer-ip-address: true # 顯示ip
    

    3.service

    //這不是傳統的service,這是和rabbitmq打交道的,不需要加注解@Service
    //這里不掉dao,掉消息中間件的service
    //信道channel和exchange綁定在一起
    //定義消息的推送管道
    @EnableBinding(Source.class)
    public class MessageProviderImpl implements IMessageProvider {
    
        /**
         * 消息發送管道
         */
        @Resource
        private MessageChannel output;
    
        @Override
        public String send() {
            String serial = UUID.randomUUID().toString();
            output.send(MessageBuilder.withPayload(serial).build());
            System.out.println("serial = " + serial);
            return null;
        }
    }
    

    controller

    @RestController
    public class SendMessageController {
    
        @Resource
        private IMessageProvider messageProvider;
    
        @GetMapping("/sendMessage")
        public String sendMessage(){
            return messageProvider.send();
        }
    }
    

    2.5 消費者

    cloud-stream-rabbitmq-consumer8802

     <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    

    yml :
    input: # 這個名字是一個通道的名稱
    instance-id: receive-8802.com #主機名

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此處配置要綁定的rabbitMQ的服務信息
            defaultRabbit: # 表示定義的名稱,用于binding的整合
              type: rabbit # 消息中間件類型
              environment: # 設置rabbitMQ的相關環境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服務的整合處理
            input: # 這個名字是一個通道的名稱
              destination: studyExchange # 表示要使用的exchange名稱定義
              content-type: application/json # 設置消息類型,本次為json,文本則設為text/plain
              binder: defaultRabbit # 設置要綁定的消息服務的具體設置
              #group: atguiguA
    eureka:
      client:
        service-url:
          defaultZone: http://eureka7001.com:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2 # 設置心跳的間隔時間,默認30
        lease-expiration-duration-in-seconds: 5 # 超過5秒間隔,默認90
        instance-id: receive-8802.com #主機名
        prefer-ip-address: true # 顯示ip
    

    主啟動類 :

    
    @SpringBootApplication
    public class StreamMQMain8802 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class,args);
        }
    }
    

    controller :@EnableBinding(Sink.class)

    @Component
    @EnableBinding(Sink.class)
    public class ReceiveMessageListenerController {
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(Sink.INPUT)
        public void input(Message<String> message){
            System.out.println("消費者1號,------->接收到的消息: "+message.getPayload()+"\t port: "+serverPort);
        }
    }
    

    2.6 分組消費與持久化

    7001 注冊中心
    8801 服務生產
    8802 服務消費
    8803 服務消費

    cloud-stream-rabbitmq-consumer8803 同 8802

    2.6.1 重復消費

    目前是8802/8803都收到了消息,出現了重復消費,怎么解決?
    比如在如下場景中,訂單系統我們做集群部署,都會從RabbitMQ獲取訂單信息,那如果一個訂單通同時被兩個服務獲取到,那么就會造成數據錯誤我們得避免這種情況,這是我們就可以使用Stream中的消息分組來解決。
    在這里插入圖片描述

    注意在stream中處于同一個group中的多個消費者之間是競爭關系,就能夠保證消息只會被其中一個應用消費一次,不同組是可以全面消費的(重復消費)

    2.6.2 分組

    故障現象:重復消費
    導致原因: 默認分組group是不同過得,組流水號不一樣,被認為不同組,可以重復消費

    微服務應用放置在同一個group中,就能保證消息只會被其中一個應用消費一次
    不同組是可以同時消費的,同一個組內會發生競爭關系,只有其中一個可以消費

    自定義分組 :

    8802 : group: atguiguA

    bindings: # 服務的整合處理
            input: # 這個名字是一個通道的名稱
              destination: studyExchange # 表示要使用的exchange名稱定義
              content-type: application/json # 設置消息類型,本次為json,文本則設為text/plain
              binder: defaultRabbit # 設置要綁定的消息服務的具體設置
              group: atguiguA
    

    8803 : group: atguiguB

    放在同一個分組,就不會出現重復消費
    8802 : group: atguiguA
    8803 : group: atguiguA

    2.6.3 持久化

    去掉8802的分組,8803的分組并沒有去除掉
    由于8802去掉了分組,8802從頭到尾都沒有去8801上取消息
    8803 由于配置了分組,重啟后8803會消費rabbitmq未曾消費過的消息

    3、SpringCloud Sleuth 分布式請求鏈路追蹤

    問題:
    在微服務框架中,一個由客戶端發起的請求在后端系統中會經過多個不同的服務節點調用來協同產生最后的請求結果,每一個前段請求都會行程一條復雜的分布式服務調用鏈路,鏈路中的任何一環出現高演示或錯誤都會引起整個請求最后的失敗。

    版權聲明:本文為weixin_41927235原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
    本文鏈接:https://blog.csdn.net/weixin_41927235/article/details/105698618

    智能推薦

    17.SpringCloud學習筆記--Bus消息總線

    Bus消息總線簡介 Spring Cloud Bus 配合 Spring Cloud Config 使用可以實現配置的動態刷新。 Spring Cloud Bus是用來將分布式系統的節點與輕量級消息系統鏈接起來的框架,它整合了Java的事件處理機制和消息中間件的功能。Spring Clud Bus目前支持RabbitMQ和Kafka。 Spring Cloud Bus能管理和傳播分布式系統間的消息...

    SpringCloud學習筆記【十三】Spring Cloud Bus消息總線

    文章目錄 本片要點 Spring Cloud Bus簡介 概述 什么是總線 基本原理 Docker安裝RabbitMQ 演示動態刷新全局廣播前置準備 新建模塊,引入依賴 配置bootstrap.yml 編寫主啟動類 編寫接口 設計思想 開始演示動態刷新全局廣播 為三個模塊都添加消息總線支持 為三個模塊配置yml 為ConfigServer配置yml 為ConfigClient配置yml 測試 原理...

    SpringCloud學習筆記-自動刷新配置-SpringCloud-Bus(消息總線)

    如果修改配置后,config-server通知order修改配置。 消息隊列(RabbitMQ) Bus 用來操作消息隊列。 第一步添加依賴(config-server)   spring-cloud-starter-bus-kafka    這是kafka的依賴,上面的是RabbitMq的依賴 說明:https://blog.csdn.net/jack281706/...

    SpringCloud消息總線(SpringCloud Bus)

    SpringCloud Bus 將分布式的節點用輕量的消息代理連接起來。它可以用于廣播配置文件的更改或者服務之間的通訊,也可以用于監控。本文要講述的是用Spring Cloud Bus實現通知微服務架構的配置文件的更改。 1:首先安裝rabbitmq,可自行去官網下載相關版本進行安裝;  2:修改config-client工程,添加相關依賴: 修改配置文件夾,加入mq相關的配置,完整配置...

    HTML中常用操作關于:頁面跳轉,空格

    1.頁面跳轉 2.空格的代替符...

    猜你喜歡

    freemarker + ItextRender 根據模板生成PDF文件

    1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...

    電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!

    Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...

    requests實現全自動PPT模板

    http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...

    Linux C系統編程-線程互斥鎖(四)

    互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...

    精品国产乱码久久久久久蜜桃不卡