您现在的位置是:首页 >技术交流 >SpringCloud_Config配置中心和Bus消息总线和Stream消息驱动网站首页技术交流
SpringCloud_Config配置中心和Bus消息总线和Stream消息驱动
一、SpringCloudConfig配置中心
1、SpringCloudConfig配置中心的概论
- 在分布式系统中,由于服务数量巨多,为了方便服务配置文件统一管理,实时更新,所以需要分布式配置中心组件。
SpringCloudConfig项目是一个解决分布式系统的配置管理问题。
SpringCloudConfig项目包含了client和server两个部分。 - Config的作用:
1)提供服务端和客户端支持
2)集中管理各环境的配置文件
3)配置文件修改之后,可以快速的生效
4)可以进行版本管理
5)支持大的并发查询
6)支持各种语言
2、SpringCloudConfig配置中心的gitee仓库搭建
-
登录码云gitee,然后点击+号,新建仓库,输入仓库名称以及路径后,点击创建即可。
设置成开源的
随便写一下介绍,以及后面公开须知的三个选项都打上勾即可,点击保存即可。 -
将gitee的仓库拉取到指定的本地文件中
1)在gitee中点击进入刚刚创建的仓库中,复制HTTPS的链接
2)在存放的文件夹中右键选择Git Bash Here
3)在刚刚打开的git命令行中拉取gitee的远程仓库到本地文件中
先输入这两个命令
再拉取即可
-
创建并编写对应的yml配置文件
1)在拉取的仓库内,也就是有.git文件的文件夹中创建config-dev.yml、config-prod.yml、config-test.yml文件
2)编辑config-dev.yml文件config: info: config-dev.yml version=1
编辑config-test.yml文件
config: info: config-test.yml version=1
编辑config-prod.yml文件
config: info: config-prod.yml version=1
3)在.git的文件夹中右键选择Git Bash Here,即再次打开git命令行
即将刚刚创建的3个文件都上传到码云gitee。第一个指令是指将当前文件夹的文件上传,第二个指令是上传到新增文件配置中,第三个是上传指令。
也就是说第一个是指定要上传的文件夹,第二是上传到哪个文件夹,第三个是上传。
4)上传成功后可以看到如下
3、SpringCloudConfig配置中心服务端的搭建
-
在cloud父项目中创建子模块项目cloud-config-server3344
-
在cloud-config-server3344项目的POM文件中添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud</artifactId> <groupId>com.zzx</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-config-server3344</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <!-- 引入Eureka client依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!-- 引入 config 依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency> <!-- 引入 web 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> </dependencies> </project>
-
修改cloud-config-server3344项目的com.zzx包下的主启动类Main,修改为ConfigServer3344,修改代码如下
package com.zzx; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.config.server.EnableConfigServer; /** * 主启动类 */ @SpringBootApplication @Slf4j // 开启配置中心功能 @EnableConfigServer public class ConfigServer3344 { public static void main(String[] args) { SpringApplication.run(ConfigServer3344.class,args); log.info("****** ConfigServer3344服务 启动成功 *****"); } }
-
在cloud-config-server3344项目的resources目录下,创建并编写application.yml文件
1)代码如下server: port: 3344 eureka: instance: # 注册的实例名 instance-id: cloud-config-server3344 client: service-url: # Eureka server的地址 #单机 #defaultZone: http://localhost:7001/eureka/ #集群 defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka spring: application: #设置应用名 name: cloud-config-server cloud: config: server: git: # git仓库地址 uri: https://gitee.com/zzx0402/cloud-config3344.git # 占位符url search-paths: - cloud-config # git仓库上面的分支名字 label: master
2)复制gitee的https的仓库链接,复制到上面uri那里
-
测试
1)启动eureka7001和eureka7002以及config3344服务
2)在浏览器中访问:http://localhost:3344/config-test.yml
4、SpringCloudConfig配置中心客户端的的搭建
-
Config配置读取规则
Config支持的请求的参数规则
1)/{application}/{profile}[/{label}]
2)/{application}-{profile}.yml
3)/{label}/{application}-{profile}.yml
4)/{application}-{profile}.properties
5)/{label}/{application}-{profile}.properties
其中{application} 就是应用名称;{profile} 就是配置文件的版本(dev、test、prod);{label} 表示 git 分支 -
在父工程cloud下,创建子模块项目cloud-config-client3355
-
在config3355项目下的POM文件中添加如下依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud</artifactId> <groupId>com.zzx</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-config-client3355</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <!-- 引入Eureka client依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!-- 引入 config 依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency> <!-- 引入 web 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bootstrap</artifactId> </dependency> </dependencies> </project>
-
修改在config3355项目下com.zzx包下的主启动类Main,修改为ConfigClientMain3355 ,代码如下
package com.zzx; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @Slf4j public class ConfigClientMain3355 { public static void main(String[] args) { SpringApplication.run(ConfigClientMain3355.class,args); log.info("********** ConfigClientMain3355服务 启动成功 ********"); } }
-
在config3355项目中的resources目录下创建bootstrap.yml配置文件,配置如下:
server: port: 3355 eureka: instance: # 注册名 instance-id: cloud-config-client3355 client: service-url: # Eureka server的地址 #集群 defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka #单机 #defaultZone: http://localhost:7001/eureka/ spring: application: #设置应用名 name: cloud-config-client cloud: config: # 分支名字 label: master # 应用名字 name: config # 环境名 profile: dev # config server 地址 uri: http://localhost:3344
-
在config3355项目中的com.zzx.controller包下创建配置控制层类ConfigController,代码如下:
package com.zzx.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ConfigController { @Value("${config.info}") private String configInfo; /** * 读取配置文件的内容 * @return */ @GetMapping("getConfigInfo") public String getConfigInfo(){ return configInfo; } }
-
测试
1)启动Eureka7001和Eureka7002以及config3344和config3355服务
2)在浏览器中访问:http://localhost:3355/getConfigInfo
5、SpringCloudConfig配置中心客户端动态刷新配置文件
-
在gitee上修改配置文件后,config3344,也就是服务器端可以马上更新;但是config3355,也就是客户端没有更新。
即需要实现客户端config3355的动态刷新配置文件的功能。 -
在config3355项目中的POM文件中添加actuator监控依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
-
在config3355项目中的bootstrap.yml文件中添加暴露监控端口的配置
management: endpoints: web: exposure: include: "*"
-
在config3355项目中的com.zzx.controller包下的ConfigController类上添加如下注解
@RefreshScope
-
测试
1)配置完后重启config3355服务后
在gitee中修改版本号
发现在浏览器上访问客户端一样是不会刷新,因为需要访问特定的url,以及使用POST请求。
2)使用Postman工具来发送POST请求,进行手动刷新
3)再次打开浏览器访问:http://localhost:3355/getConfigInfo
此时已经更新。
二、SpringCloudBus消息总线
1、SpringCloudBus消息总线的概论
- Spring Cloud Bus通过建立多个应用之间的通信频道,管理和传播应用间的消息,从技术角度来说,应用了AMQP消息代理作为通道,通过MQ的广播机制实现消息的发送和接收。Bus支持两种消息代理:RabbitMQ和Kafka 。
Spring Cloud Bus组件主要解决配置中心客户端手动刷新的问题。 - Spring Cloud Bus做配置更新的步骤:
1)修改配置文件,提交代码触发post给客户端A发送bus/refresh
2)客户端A接收到请求从Server端更新配置并且发送给Spring Cloud Bus
3)Spring Cloud bus接到消息并通知给其它客户端
4)其它客户端接收到通知,请求Server端获取最新配置
5)全部客户端均获取到最新的配置
即修改配置文件后,A更新并且通知消息总线;消息总线再通过广播发送消息给其他客户端,其他客户端更新。
2、虚拟机安装RabbitMQ
- 打开centos7的虚拟机,使用mobax终端连接该虚拟机,安装docker:
yum install docker
- 启动dokcer:
systemctl start docker
- 使用docker指令拉取RabbitMQ消息中间件的镜像:
docker pull docker.io/macintoshplus/rabbitmq-management
- 启动RabbitMQ:
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 docker.io/macintoshplus/rabbitmq-management
即后台启动RabbitMQ,指定名字为rabbitmq,用户名和密码都为guest,可视化RabbitMQ端口15672,服务端口为5672,镜像名字为docker.io/macintoshplus/rabbitmq-management。 - IP是虚拟机的IP地址,浏览器访问:
http://192.168.126.11:15672/
- 此时用户名和密码都是上面指定的guest,登录后如图
3、SpringCloudBus动态刷新全局广播
-
创建父项目cloud的子模块项目cloud-config-client3366
1)直接复制cloud-config-client3355,粘贴到cloud父项目中,修改项目名字为cloud-config-client3366
2)修改cloud-config-client3366的POM文件中的项目名为cloud-config-client3366
3)右键cloud-config-client3366的POM文件,选择AddasMavenProject
4)将主启动类cloud-config-client3355都修改为cloud-config-client3366
5)修改bootstrap.yml文件的端口号和注册的实例名
-
给config3344、config3355、config3366项目的POM文件添加RabiitMQ代理的消息总线BUS的依赖
<!-- 消息总线BUS --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>
-
给config3344、config3355、config3366项目的yml文件添加rabbitmq的配置信息
spring: rabbitmq: host: 192.168.126.11 port: 5672 username: guest password: guest
-
测试实现一次手动刷新配置,所有客户端都能刷新配置
1)启动eureka7001和eureka7002和config3344和config3355以及config3366服务
2)更新gitee上配置信息的版本号为3,然后提交
3)使用postman请求发送post请求更新配置信息
发送完成后,config3355和config3366服务都更新了配置,即只需要发送一个,其他客户端也会更新。 -
测试只刷新一个服务的配置
1)更新gitee上配置信息的版本号为4,然后提交
2)使用postman请求发送post请求更新配置信息
即在全部更新的url路径上加上指定的应用名:端口号,即可实现单一服务的配置刷新。也就是说3355的版本号更新为4了,而3366不更新,还是3。
三、SpringCloudStream消息驱动
1、SpringCloudStream消息驱动的概论
- SpringCloudStream 是一个构建消息驱动微服务的框架。实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
- SpringCloudStream组件主要解决屏蔽底层消息中间件的差异问题。
2、SpringCloudStream消息驱动的消息生产者
-
在cloud父工程下创建cloud-stream-rabbitmq-provider8001
-
在cloud-stream-rabbitmq-provider8001项目中的POM文件中导入依赖
即导入这两个依赖的任意一个即可。<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud</artifactId> <groupId>com.zzx</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8001</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- 引入Eureka client依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> </dependencies> </project>
-
在cloud-stream-rabbitmq-provider8001项目的com.zzx包下,修改主启动类Main的名字为StreamMain8001,修改代码如下
package com.zzx; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication @Slf4j public class StreamMain8001 { public static void main(String[] args) { SpringApplication.run(StreamMain8001.class,args); log.info("******** StreamMain8001 启动成功 *******"); } }
-
在cloud-stream-rabbitmq-provider8001项目的resources目录下,创建application.yml配置文件,添加如下配置
server: port: 8001 eureka: instance: # 注册的实例名 instance-id: cloud-stream-provider8001 client: service-url: # Eureka server的地址 #单机 #defaultZone: http://localhost:7001/eureka/ #集群 defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka spring: application: #设置应用名 name: cloud-stream-provider rabbitmq: host: 192.168.126.11 port: 5672 username: guest password: guest cloud: stream: bindings: # 广播消息 生产者绑定名称,myBroadcast是自定义的绑定名称,out代表生产者,0是固定写法 myBroadcast-out-0: # 对应的真实的 RabbitMQ Exchange destination: my-broadcast-topic
-
在cloud-stream-rabbitmq-provider8001项目的com.zzx包下创建common包,在common包下创建消息实体类MyMessage,添加如下代码
package com.zzx.common; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * 消息实体类 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class MyMessage implements Serializable { /** * 消息体 */ private String payload; }
-
在cloud-stream-rabbitmq-provider8001项目的com.zzx包下创建service包,在service包下创建消息接口IMessageProvider,添加如下代码
package com.zzx.service; import org.springframework.stereotype.Service; /** * 发送消息的接口 */ public interface IMessageProvider { /** * 发送消息 * @param message 消息的内容 * @return */ String send(String message); }
-
在cloud-stream-rabbitmq-provider8001项目的com.zzx.service包下创建impl包,在impl包下创建消息实现类MessageProviderImpl,添加如下代码
package com.zzx.service.impl; import com.zzx.common.MyMessage; import com.zzx.service.IMessageProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Service; /** * 定义消息推送的管道 */ @Service public class MessageProviderImpl implements IMessageProvider { @Autowired private StreamBridge streamBridge; @Override public String send(String message) { MyMessage myMessage = new MyMessage(); myMessage.setPayload(message); /** * 第一个参数为 生产者绑定的名称 * 第二个参数为 发送的消息实体 */ streamBridge.send("myBroadcast-out-0",myMessage); return "success"; } }
-
在cloud-stream-rabbitmq-provider8001项目的com.zzx包下创建controller包,在controller包下创建消息控制层类ProviderController ,添加如下代码
package com.zzx.controller; import com.zzx.service.IMessageProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProviderController { @Autowired private IMessageProvider iMessageProvider; /** * 发送消息 * @param message 消息的内容 * @return */ @GetMapping("send") public String send(String message){ return iMessageProvider.send(message); } }
-
测试
1) 启动eureka7001和eureka7002和stream8001服务
2)使用Postman测试
即测试消息生产者是否能接收到消息。
3)在postman发送请求成功后,查看可视化rabbitmq时,可以看到exchange会被自动创建。
3、SpringCloudStream消息驱动的消息消费者1
-
在cloud父工程下创建cloud-stream-rabbitmq-consumer8002
-
在cloud-stream-rabbitmq-consumer8002项目的POM文件中引入依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud</artifactId> <groupId>com.zzx</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-consumer8002</artifactId> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- 引入Eureka client依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> </dependencies> </project>
-
在cloud-stream-rabbitmq-consumer8002项目的resources目录下,创建application.yml配置文件,添加如下配置
server: port: 8002 eureka: instance: # 注册的实例名 instance-id: cloud-stream-consumer8002 client: service-url: # Eureka server的地址 #单机 #defaultZone: http://localhost:7001/eureka/ #集群 defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka spring: application: #设置应用名 name: cloud-stream-consumer rabbitmq: host: 192.168.126.11 port: 5672 username: guest password: guest cloud: stream: bindings: # 广播消息 消费者绑定名称,myBroadcast是自定义的绑定名称,out代表生产者,in代表消费者,0是固定写法 myBroadcast-in-0: # 对应的真实的 RabbitMQ Exchange destination: my-broadcast-topic function: # 定义出消费者 definition: myBroadcast
-
将cloud-stream-rabbitmq-provider8001项目的com.zzx.common包以及该包下的类复制,粘贴cloud-stream-rabbitmq-consumer8002项目的com.zzx包下
-
在cloud-stream-rabbitmq-consumer8002项目的com.zzx.service包下,创建消费者类Consumer
package com.zzx.service; import com.zzx.common.MyMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component @Slf4j /** * 消费者类 */ public class Consumer { /** * 消费广播消息 * @return */ @Bean public java.util.function.Consumer<MyMessage> myBroadcast(){ return myMessage -> { log.info("接收到了广播消息:{}",myMessage.getPayload()); }; } }
-
在cloud-stream-rabbitmq-consumer8002项目的com.zzx下,修改主启动类Main的名字为StreamConsumerMain8002,代码如下
package com.zzx; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * 主启动类 */ @SpringBootApplication @Slf4j public class StreamConsumerMain8002 { public static void main(String[] args) { SpringApplication.run(StreamConsumerMain8002.class,args); log.info("********** StreamConsumerMain8002 启动成功 ********"); } }
4、SpringCloudStream消息驱动的消息消费者2
- 直接复制在cloud-stream-rabbitmq-consumer8002项目,粘贴到cloud父工程下,修改项目名为cloud-stream-rabbitmq-consumer8003。
- 在cloud-stream-rabbitmq-consumer8003项目POM修改项目名为cloud-stream-rabbitmq-consumer8003
- 在cloud-stream-rabbitmq-consumer8003项目右键选择AddasMavenProject
- 在cloud-stream-rabbitmq-consumer8003项目的application.yml文件中,修改8002为8003
- 在cloud-stream-rabbitmq-consumer8003项目的主启动类中修改8002为8003
- 在cloud父工程的POM文件中添加子模块cloud-stream-rabbitmq-consumer8003
- 测试
1)启动eureka7001和eureka7002和StreamMain8001和StreamConsumerMain8002以及StreamConsumerMain8003服务
2)在Postman工具发送请求到8001
此时消息生产者8001会将消息进行广播到消息消费者8002和8003;在IDEA中8002和8003在接收到广播的消息后,会在控制台中打印该信息。
5、SpringCloudStream消息驱动的分组消费
-
在cloud-stream-rabbitmq-consumer8002和cloud-stream-rabbitmq-consumer8003的yml文件中的bindings下添加如下配置信息
# 分组消息 myGroup-in-0: # 对应的真实的 RabbitMQ Exchange destination: my-group-topic # 同一分组的消费服务,只能有一个消费者消费到消息 group: my-group-0 function: # 定义出消费者 definition: myBroadcast;myGroup
此时跟前面的广播配置不同的是多了一个group,即分组。
-
在cloud-stream-rabbitmq-provider8001的yml文件中的bindings下添加如下配置信息
# 分组消费 生产者绑定名称,myGroup是自定义的绑定名称,out代表生产者,0是固定写法 myGroup-out-0: # 对应的真实的 RabbitMQ Exchange destination: my-group-topic
-
在cloud-stream-rabbitmq-consumer8002和cloud-stream-rabbitmq-consumer8003的com.zzx.service包中的Consumer类中添加分组消费方法
@Bean public java.util.function.Consumer<MyMessage> myGroup(){ return myMessage -> { log.info("接收到了分组消息:{}",myMessage.getPayload()); }; }
-
在cloud-stream-rabbitmq-provider8001项目中的com.zzx.service包中的IMessageProvider中,添加如下接口方法
String groupSend(String message);
-
在cloud-stream-rabbitmq-provider8001项目中的com.zzx.service.impl包中的MessageProviderImpl中,实现IMessageProvider接口的方法,代码如下
@Override public String groupSend(String message) { MyMessage myMessage = new MyMessage(); myMessage.setPayload(message); /** * 第一个参数为 生产者绑定的名称 * 第二个参数为 发送的消息实体 */ streamBridge.send("myGroup-out-0",myMessage); return "success"; }
-
在cloud-stream-rabbitmq-provider8001项目中的com.zzx.controller包中的ProviderController中,添加调用发送分组消息方法,代码如下
/** * 发送分组消息 * @param message 消息的内容 * @return */ @GetMapping("groupSend") public String groupSend(String message){ return iMessageProvider.groupSend(message); }
-
测试
1)启动eureka7001和eureka7002和StreamMain8001和StreamConsumerMain8002以及StreamConsumerMain8003服务
2)在Postman工具发送请求到8001
此时调用groupSend方法,也就是分组方法,即8002和8003这两个消费者每次只有一个会接收到分组消息并在IDEA控制台进行打印。
总结
- 1)在分布式系统中,由于服务数量巨多,为了方便服务配置文件统一管理,实时更新,所以需要分布式配置中心组件SpringCloudConfig。
即SpringCloudConfig项目是一个解决分布式系统的配置管理问题。
2)SpringCloudConfig的配置文件需要存在gitee上,需要创建gitee仓库并设置为开源;创建完gitee的仓库后,需要在本地创建文件夹来存储配置文件,然后在该文件夹中打开git命令行,将gitee中的HTTPS链接复制到该命令行中,从而拉取该gitee远程仓库;最后在创建yml配置文件后,将这些文件上传到gitee上。
3)SpringCloudConfig服务端的搭建,需要引入config的服务端和eureka客户端等依赖;以及yml文件需要配置gitee的HTTPS的远程仓库链接和分支等信息。config服务器端,即负责向Gitte远程仓库拉取配置文件的服务,以及将拉取过来的配置文件共享给config客户端。
4)SpringCloudConfig客户端的搭建,需要引入config客户端和actuator监控和eureka客户端以及bootstrap等依赖;以及需要在bootstrap.yml文件中配置SpringCloudConfig服务端的服务器地址、gitee中的分支名、应用名和环境名和暴露监控端口等。而且动态刷新时还需要给ConfigController类上添加@RefreshScope注解,这样才能使用POST请求通过访问本机ip:端口/actuator/refresh路径的方法进行手动刷新gitee上修改的配置。
手动刷新配置时,必须使用POST请求。 - 1)SpringCloudBus消息总线,即修改配置文件后,A更新后通知消息总线;消息总线再通过广播发送消息给其他客户端,其他客户端更新。
消息总线主要用来解决配置中心客户端的手动刷新;可以用RabbitMQ或Kafka来作为代理。
即一个客户端更新,全部客户端都会实现更新操作,而不用一个一个的手动更新。
2)使用Centos7虚拟机,然后用docker来安装RabbitMQ。
3)SpringCloudBus动态刷新全局广播的实现流程,需要给config的服务端和客户端的POM文件加上Bus的依赖和yml配置文件加上rabbitmq的配置。动态刷新全局广播,即一个客户端发送刷新消息到Bus消息总线即可实现全部客户端刷新配置文件。
也可以实现单一服务刷新配置,即在全局的url上加上应用名:端口号即可。 - 1)SpringCloudStream是一个构建消息驱动微服务的框架,主要解决屏蔽底层消息中间件的差异问题。
2)创建消息生产者项目,需要导入eureka客户端和stream-rabbit等依赖;配置rabbitmq信息和stream绑定生产者的交换机等信息。然后通过StreamBridge对象进行消息的发送等操作。
3)创建消息的消费者项目,也需要导入eureka客户端和stream-rabbit等依赖;配置rabbitmq信息和stream绑定生产者的交换机等信息,但是绑定消费者名称时,需要指定为in。
并且需要在definition属性指定消费者方法的名字;然后在类中需要实现这个消费者方法并且在方法上加上@Bean注解,将该类上加上@Component注解,即将该类中的使用@Bean的方法交给SpringIOC容器。在消息生产者通过广播发送消息到消息消费者,Spring就会调用该方法。
4)分组消费,即在广播的基础上,在yml文件加上group属性,用于定义分组信息。在同一个分组内的所有消费者,当消费生产者每次发送消息只有一个消费者能接收到。 - 1)Config配置中心用来解决分布式系统的配置管理问题。
2)但是Config需要配置中心的客户端去手动刷新配置,然而Bus消息总线使用消息中间件解决配置中心客户端的手动刷新,也就是说将消息发送给Bus消息总线,Bus消息总线给所有配置中心客户端发送更新的消息,但是配置中心服务端和客户端都需要yml文件中配置消息中间件的信息,以此进行绑定。
3)Stream消息驱动用来解决屏蔽底层消息中间件的差异问题。即解决消息中间件不一致的问题,因为消息中间件有很多种。