SpringCloud學習筆記-SpringCloud Bus、SpringCloudStream
標簽: springcloud
springcloudConfig 手動刷新的問題:
假如有多個客戶端3344、3366、3377
每個微服務都要執行一次post請求,手動刷新
可否廣播,一次通知,處處生效?
分布式自動刷新配置功能
SpringCloud Bus 配合 SpringCloud Config可以實現配置的動態刷新
1、SpringCloud Bus 消息總線
1.1 基本概念
Bus支持兩種消息代理:RabbitMQ 和 Kafka
SpringCloud Bus 是用來將分布式系統的節點與輕量級消息系統連接起來的框架,它整合了Java的時間處理機制和消息中間件的功能。
SpringCloud Bus能管理和傳播分布式系統間的消息,就像一個分布式執行器,可用于廣播狀態更改,時間推送等,也可以當做微服務之間的通信通道
什么是總線:
在微服務架構的系統中,通常會使用輕量級的消息代理來構建一個消息主題,并讓系統中的所有微服務實例都連接上來。由于該主題中產生的消息會被所有實例監聽和消費,所以稱它為消息總線。 在總線上的各個實例,都可以方便的廣播一些需要讓其他連接載該主題上的實例都知道的消息。
基本原理
ConfigClient 實例都監聽MQ 中的同一個Topic默認是(SpringCloud Bus)他會把這個信息放入Topic中,這樣其他監聽同一個Topic的服務就能得到通知,然后取更新自身的配置。
1.2 RabbitMQ環境配置
- 安裝Erlang
- 安裝RabbitMQ
- 進入RabbitMQ下的bin目錄
- 輸入一下命令啟動安裝功能 rabbit-plugins enable rabbitmq_management
- 可視化插件
- http://localhost:15672/
- 默認賬號密碼 guest guest
1.3 Bus動態刷新全局廣播通知
制作3366
cloud-config-client-3366
3355也添加這個依賴
<!--添加消息總線rabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
bootstrap.yml
server:
port: 3366
spring:
application:
name: config-client
cloud:
#Config客戶端配置
config:
label: master #分支名稱
name: config #配置文件名稱
profile: dev #讀取后綴名稱 上述3個綜合:master分支上config-dev.yml的配置文件被讀取 http://config-3344.com:3344/master/config-dev.yml
uri: http://localhost:3344 #配置中心地址
#rabbit相關配置 15672是web管理界面的端口,5672是MQ訪問的端口
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
#服務注冊到eureka地址
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
#暴露監控端點
management:
endpoints:
web:
exposure:
include: "*"
主啟動類
@SpringBootApplication
@EnableEurekaClient
public class ConfigClientMain3366 {
public static void main(String[] args) {
SpringApplication.run(ConfigClientMain3366.class,args);
}
}
ConfigClientController
@RestController
@RefreshScope
public class ConfigClientController {
@Value("${server.port}")
private String serverPort;
@Value(("${config.info}"))
private String configInfo;
@GetMapping("/configInfo")
public String getConfigInfo(){
return "serverPort: "+serverPort+"\t\n\n configInfo: "+configInfo;
}
}
- 利用消息總線觸發一個客戶端/bus/refresh,而刷新所有客戶端配置
1. 破壞了微服務職責的單一性,因為微服務本身就是業務模塊,它本不應該承擔刷新的職責
2. 破壞了微服務節點的對等性
3. 有一定局限性,例如,微服務在遷移的時候,它的網絡地址常常發生變化,此時如果想要做到自動刷新,那就會增加更多的修改 - 利用消息總線觸發一個服務端ConfigServer的bus/refresh端點,而刷新所有客戶端配置
1.3.1 3344 3355 3366 添加消息總線
<!--添加消息總線RbbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
yml
#rabbitmq相關配置,暴露bus刷新配置的端點
management:
endpoints: #暴露bus刷新配置的端點
web:
exposure:
include: 'bus-refresh' #凡是暴露監控、刷新的都要有actuator依賴,bus-refresh就是actuator的刷新操作
3355
<!--添加消息總線rabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
yml
#rabbit相關配置 15672是web管理界面的端口,5672是MQ訪問的端口
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
3366
<!--添加消息總線rabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
#rabbit相關配置 15672是web管理界面的端口,5672是MQ訪問的端口
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
1.3.2 測試
一次修改,廣播通知,出處生效
- 在github上修改配置文件
- 發送post請求 cur -X POST "http://localhost:3344/actuator/bus-refresh" (向3344發post請求)
- 刷新3355、3366 localhost3366/configInfo localhost3355/configInfo
1.4 定點通知
只通知3355 3366
指定某一個具體實例生效
公式 :
http://localhost:配置中心的端口號/actuator/bus-refresh/{destination}
/bus/refresh請求不再發送到具體的服務實例上。
cur -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
2、SpringCloud Stream 消息驅動
2.1 基本概念
解決的痛點:(MQ消息中間件)
- ActiveMQ
- RabbitMQ
- RocketMQ
- kafka
系統可能會同時存在兩種RabbitMQ 和 kafka
舉例:對于我們Java程序員來說,可能有時要使用ActiveMQ,有時要使用RabbitMQ,甚至還有RocketMQ以及Kafka,這之間的切換似乎很麻煩,我們很難,也沒有太多時間去精通每一門技術,那有沒有一種新技術的誕生,讓我們不再關注具體MQ的細節,自動的給我們在各種MQ內切換。
有沒有一種新的結束誕生,讓我們不再關注具體的MQ的細節,我們只需要用一種適配綁定的方式,在各種MQ之間切換
cloud stream : 屏蔽底層消息中間件的差異,統一消息的編程模型
SpringCloud Stream是一個構建消息驅動微服務的框架
應用程序通過inputs 或者outputs 來與SpringCloud Stream 中的 binder對象交互
通過我們來binding,而Springcloud Stream的binder對象負責與消息中間件交互
所以我們只需要搞清楚如何與Springcloud Stream交互就可以黨鞭的使用消息驅動的方式
通過spring Integeration來連接消息代理中間件以實現消息時間驅動
Springcloud Stream為一些供應商的消息中間件產品提供了葛新華的自動化配置實現,引用了發布訂閱、消費組、分區三個概念
目前僅支持RabbitMQ kafka
2. 2 設計思想
標準MQ :
- 生產者和消費者之間通過消息媒介傳遞信息內容
- 消息必須走特定的通道
為什么要用SpringCloud ?
比方說我們用到了RabbitMQ和Kafka,由于這兩公分消息中間件的架構上的不同
像RabbitMQ有exchange,kafka有Topic 和 Partitions分區
在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由于各消息中間件構建的初衷不同,他們的實現細節上會有較大的差異性
通過定義綁定器作為中間層,完美的實現了應用程序與消息中間細節之間的隔離
通過向應用程序暴露統一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件的差異
通過定義綁定器Binder作為中間層,實現了應用程序和消息中間件的細節之間的隔離。
Binder :
input 消費者
output 生產者
Stream的通信方式遵循了發布訂閱模式 Topic 主題進行分區廣播
- 在RabbitMQ就是Exchange
- 在kafka中就是Topic
2.3 編碼API和常用注解
Binder : 很方便的連接中間件,屏蔽差異
Channel: 通道是隊列Queue的一種抽象,在消息通訊系統中實現存儲和轉發的媒介,通過channel 對隊列進行配置
Source 和 Sink: 消息的生產者和消費者
2.4 生產者
cloud-stream-rabbitmq-provider8801
1.pom
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2.appliaction.yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitMQ的服務信息
defaultRabbit: # 表示定義的名稱,用于binding的整合
type: rabbit # 消息中間件類型
environment: # 設置rabbitMQ的相關環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
ouput: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的exchange名稱定義
content-type: application/json # 設置消息類型,本次為json,文本則設為text/plain
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設置心跳的間隔時間,默認30
lease-expiration-duration-in-seconds: 5 # 超過5秒間隔,默認90
instance-id: send-8801.com # 主機名
prefer-ip-address: true # 顯示ip
3.service
//這不是傳統的service,這是和rabbitmq打交道的,不需要加注解@Service
//這里不掉dao,掉消息中間件的service
//信道channel和exchange綁定在一起
//定義消息的推送管道
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
/**
* 消息發送管道
*/
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("serial = " + serial);
return null;
}
}
controller
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
2.5 消費者
cloud-stream-rabbitmq-consumer8802
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
yml :
input: # 這個名字是一個通道的名稱
instance-id: receive-8802.com #主機名
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要綁定的rabbitMQ的服務信息
defaultRabbit: # 表示定義的名稱,用于binding的整合
type: rabbit # 消息中間件類型
environment: # 設置rabbitMQ的相關環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的exchange名稱定義
content-type: application/json # 設置消息類型,本次為json,文本則設為text/plain
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
#group: atguiguA
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設置心跳的間隔時間,默認30
lease-expiration-duration-in-seconds: 5 # 超過5秒間隔,默認90
instance-id: receive-8802.com #主機名
prefer-ip-address: true # 顯示ip
主啟動類 :
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class,args);
}
}
controller :@EnableBinding(Sink.class)
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消費者1號,------->接收到的消息: "+message.getPayload()+"\t port: "+serverPort);
}
}
2.6 分組消費與持久化
7001 注冊中心
8801 服務生產
8802 服務消費
8803 服務消費
cloud-stream-rabbitmq-consumer8803 同 8802
2.6.1 重復消費
目前是8802/8803都收到了消息,出現了重復消費,怎么解決?
比如在如下場景中,訂單系統我們做集群部署,都會從RabbitMQ獲取訂單信息,那如果一個訂單通同時被兩個服務獲取到,那么就會造成數據錯誤我們得避免這種情況,這是我們就可以使用Stream中的消息分組來解決。
注意在stream中處于同一個group中的多個消費者之間是競爭關系,就能夠保證消息只會被其中一個應用消費一次,不同組是可以全面消費的(重復消費)
2.6.2 分組
故障現象:重復消費
導致原因: 默認分組group是不同過得,組流水號不一樣,被認為不同組,可以重復消費
微服務應用放置在同一個group中,就能保證消息只會被其中一個應用消費一次
不同組是可以同時消費的,同一個組內會發生競爭關系,只有其中一個可以消費
自定義分組 :
8802 : group: atguiguA
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的exchange名稱定義
content-type: application/json # 設置消息類型,本次為json,文本則設為text/plain
binder: defaultRabbit # 設置要綁定的消息服務的具體設置
group: atguiguA
8803 : group: atguiguB
放在同一個分組,就不會出現重復消費
8802 : group: atguiguA
8803 : group: atguiguA
2.6.3 持久化
去掉8802的分組,8803的分組并沒有去除掉
由于8802去掉了分組,8802從頭到尾都沒有去8801上取消息
8803 由于配置了分組,重啟后8803會消費rabbitmq未曾消費過的消息
3、SpringCloud Sleuth 分布式請求鏈路追蹤
問題:
在微服務框架中,一個由客戶端發起的請求在后端系統中會經過多個不同的服務節點調用來協同產生最后的請求結果,每一個前段請求都會行程一條復雜的分布式服務調用鏈路,鏈路中的任何一環出現高演示或錯誤都會引起整個請求最后的失敗。
智能推薦
17.SpringCloud學習筆記--Bus消息總線
Bus消息總線簡介 Spring Cloud Bus 配合 Spring Cloud Config 使用可以實現配置的動態刷新。 Spring Cloud Bus是用來將分布式系統的節點與輕量級消息系統鏈接起來的框架,它整合了Java的事件處理機制和消息中間件的功能。Spring Clud Bus目前支持RabbitMQ和Kafka。 Spring Cloud Bus能管理和傳播分布式系統間的消息...
SpringCloud學習筆記【十三】Spring Cloud Bus消息總線
文章目錄 本片要點 Spring Cloud Bus簡介 概述 什么是總線 基本原理 Docker安裝RabbitMQ 演示動態刷新全局廣播前置準備 新建模塊,引入依賴 配置bootstrap.yml 編寫主啟動類 編寫接口 設計思想 開始演示動態刷新全局廣播 為三個模塊都添加消息總線支持 為三個模塊配置yml 為ConfigServer配置yml 為ConfigClient配置yml 測試 原理...
SpringCloud學習筆記-自動刷新配置-SpringCloud-Bus(消息總線)
如果修改配置后,config-server通知order修改配置。 消息隊列(RabbitMQ) Bus 用來操作消息隊列。 第一步添加依賴(config-server) spring-cloud-starter-bus-kafka 這是kafka的依賴,上面的是RabbitMq的依賴 說明:https://blog.csdn.net/jack281706/...
SpringCloud消息總線(SpringCloud Bus)
SpringCloud Bus 將分布式的節點用輕量的消息代理連接起來。它可以用于廣播配置文件的更改或者服務之間的通訊,也可以用于監控。本文要講述的是用Spring Cloud Bus實現通知微服務架構的配置文件的更改。 1:首先安裝rabbitmq,可自行去官網下載相關版本進行安裝; 2:修改config-client工程,添加相關依賴: 修改配置文件夾,加入mq相關的配置,完整配置...
猜你喜歡
freemarker + ItextRender 根據模板生成PDF文件
1. 制作模板 2. 獲取模板,并將所獲取的數據加載生成html文件 2. 生成PDF文件 其中由兩個地方需要注意,都是關于獲取文件路徑的問題,由于項目部署的時候是打包成jar包形式,所以在開發過程中時直接安照傳統的獲取方法沒有一點文件,但是當打包后部署,總是出錯。于是參考網上文章,先將文件讀出來到項目的臨時目錄下,然后再按正常方式加載該臨時文件; 還有一個問題至今沒有解決,就是關于生成PDF文件...
電腦空間不夠了?教你一個小秒招快速清理 Docker 占用的磁盤空間!
Docker 很占用空間,每當我們運行容器、拉取鏡像、部署應用、構建自己的鏡像時,我們的磁盤空間會被大量占用。 如果你也被這個問題所困擾,咱們就一起看一下 Docker 是如何使用磁盤空間的,以及如何回收。 docker 占用的空間可以通過下面的命令查看: TYPE 列出了docker 使用磁盤的 4 種類型: Images:所有鏡像占用的空間,包括拉取下來的鏡像,和本地構建的。 Con...
requests實現全自動PPT模板
http://www.1ppt.com/moban/ 可以免費的下載PPT模板,當然如果要人工一個個下,還是挺麻煩的,我們可以利用requests輕松下載 訪問這個主頁,我們可以看到下面的樣式 點每一個PPT模板的圖片,我們可以進入到詳細的信息頁面,翻到下面,我們可以看到對應的下載地址 點擊這個下載的按鈕,我們便可以下載對應的PPT壓縮包 那我們就開始做吧 首先,查看網頁的源代碼,我們可以看到每一...
Linux C系統編程-線程互斥鎖(四)
互斥鎖 互斥鎖也是屬于線程之間處理同步互斥方式,有上鎖/解鎖兩種狀態。 互斥鎖函數接口 1)初始化互斥鎖 pthread_mutex_init() man 3 pthread_mutex_init (找不到的情況下首先 sudo apt-get install glibc-doc sudo apt-get install manpages-posix-dev) 動態初始化 int pthread_...