您现在的位置是:首页 >学无止境 >全链路压测时动态路由数据源MySQL、MongoDB、Redis网站首页学无止境
全链路压测时动态路由数据源MySQL、MongoDB、Redis
目录
1. 参数配置application-localDynamic.yml
2. 加载配置参数DynamicDataSourceProperties.java
3. 动态数据源DynamicDataSource.java
4. 动态数据源供应DynamicDataSourceProvider.java
6. 动态数据源上下文DynamicDataSourceContextHolder.java
7. 动态数据源过滤器DynamicDataSourceFilter.java
1. 参数配置application-localDynamicMongo.yml
2. 加载配置参数DynamicMongoSourceProperties.java
3. 动态数据源DynamicMongoSource.java
4. 动态数据源供应DynamicMongoSourceProvider.java
6. 动态数据源上下文DynamicMongoSourceContextHolder.java
7. 动态数据源过滤器DynamicMongoSourceFilter.java
1. 参数配置application-localDynamicRedis.yml
2. 加载配置参数DynamicRedisSourceProperties.java
3. 动态数据源DynamicRedisSource.java
4. 动态数据源供应DynamicRedisSourceProvider.java
6. 动态数据源上下文DynamicRedisSourceContextHolder.java
7. 动态数据源过滤器DynamicRedisSourceFilter.java
一、全链路压测
验证系统所能够承受的最大负载是否接近于预期,是否经得住大流量的冲击,绝非是一件易事。有过分布式系统开发经验的同学都应该非常清楚,简单对某个接口、子系统进行压测,并不能够准确探测出系统整体的流量水平,并且对环境有着极为严苛的要求。全链路压测就是确保大促来临时核心链路的整体稳定。
如何在大促前夕对线上环境实施全链路压测,做到有指导的进行容量规划和性能优化。大促前最基本也是最棘手的两项关键任务:
- 评估机器扩容数量;
- 验证系统整体容量是否能够有效支撑所预估的流量峰值。
首先梳理系统中的核心链路。其次算出单机最大流量。然后根据GMV(Gross Merchandise Volume _ 商品交易总额)或根据历史估计现有最大流量算出扩容的机器数量。最后压测整体系统才能暴露问题,如:慢SQL、连接资源耗尽(DB连接池连接)、加锁导致大量线程等待(排他锁、分布式锁、DB的行锁)等问题。
线上压测具有较大的风险,绝不能出现一丝失误。虽然困难重重,但是也要测试系统的真实流量水位。首先高峰期绝对是不能压测的,安全做法低峰期时,Nginx层控制用户流量方向。以下是线上全链路压测的关键环节:
- 如何标记流量是用户流量还是压测流量:如请求头、URL请求参数等来区分;
- 如何将压测数据引流到隔离环境中:如同表的某字段区分、动态路由到隔离环境(本章介绍);
- 如何构造压测数据:如自动生成数据、线上数据敏感过滤后的数据等;
- 如何升级和改造业务系统和中间件:如定时任务、消息中间件、外部接口等;
- 如何发起超大规模的流量:如集群Jmeter、分布式压测nGrinder等。
压测最大的难点是压测数据如何构造和隔离。本章节主要实现动态路由MySQL、MongoDB、Redis,用户流量和压测流量同时访问一台服务器后,动态路由实现用户数据与压测数据的完全隔离,用户数据避免被污染,避免导致出现数据安全事故。
二、动态路由Mysql
实现Mysql的动态路由,首先了解org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource类。该类是在Spring2.0.1中引入的(不是Spring Boot2.0.1), 充当了DataSource的路由中介,它能够在运行时, 根据某种key值来动态切换到真正的DataSource上。
AbstractRoutingDataSource大致逻辑是:提前准备好各种数据源,存入到一个Map中,Map的key是数据源的名字,Map的value就是具体的数据源,然后再把这个Map配置到AbstractRoutingDataSource中,最后每次执行数据库查询的时候,拿一个key出来,该类会找到具体的数据源去执行这次的数据库操作。
MongoDB、Redis没有该类,则本人根据各自的工厂实现了动态路由类,来完成各自的切换,见下小节。
1. 参数配置application-localDynamic.yml
spring:
dynamic-data-source:
druid:
# dsKey: test
ds:
# 主库数据源,默认master不能变
master:
url: jdbc:mysql://${remote.ip}:3307/prod?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: 123456
# 压测库数据源
test:
url: jdbc:mysql://${remote.ip}:3307/test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: 123456
driverClassName: com.mysql.cj.jdbc.Driver
type: mysql
initialSize: 10
keepAlive: true
minIdle: 10
maxActive: 50
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
maxEvictableIdleTimeMillis: 306000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
filters: stat,wall,log4j
poolPreparedStatements: false
maxPoolPreparedStatementPerConnectionSize: -1
useGlobalDataSourceStat: true
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=1000
2. 加载配置参数DynamicDataSourceProperties.java
package com.common.instance.demo.config.dynamicDataSource;
import com.alibaba.druid.pool.DruidDataSource;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description 动态加载数据源配置
* @date 2023/3/14 15:13
**/
@Data
@Component
@ConfigurationProperties(prefix = "spring.dynamic-data-source.druid")
public class DynamicDataSourceProperties {
private int initialSize;
private int minIdle;
private int maxActive;
private int maxWait;
private int timeBetweenEvictionRunsMillis;
private int minEvictableIdleTimeMillis;
private int maxEvictableIdleTimeMillis;
private String validationQuery;
private boolean testWhileIdle;
private boolean testOnBorrow;
private boolean testOnReturn;
private Map<String, Map<String, String>> ds;
private String dsKey;
public DruidDataSource dataSource(DruidDataSource datasource) {
/** 配置初始化大小、最小、最大 */
datasource.setInitialSize(initialSize);
datasource.setMaxActive(maxActive);
datasource.setMinIdle(minIdle);
/** 配置获取连接等待超时的时间 */
datasource.setMaxWait(maxWait);
/** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
/** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */
datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
/**
* 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
*/
datasource.setValidationQuery(validationQuery);
/** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
datasource.setTestWhileIdle(testWhileIdle);
/** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnBorrow(testOnBorrow);
/** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
datasource.setTestOnReturn(testOnReturn);
return datasource;
}
}
3. 动态数据源DynamicDataSource.java
继承org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource类,是实现动态路由切换的核心逻辑类。
package com.common.instance.demo.config.dynamicDataSource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import java.util.HashMap;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description 动态数据源
* @date 2023/3/14 15:36
**/
public class DynamicDataSource extends AbstractRoutingDataSource {
private DynamicDataSourceProvider dynamicDataSourceProvider;
private DynamicDataSourceProperties dynamicDataSourceProperties;
public DynamicDataSource(DynamicDataSourceProvider dynamicDataSourceProvider, DynamicDataSourceProperties dynamicDataSourceProperties) {
this.dynamicDataSourceProvider = dynamicDataSourceProvider;
this.dynamicDataSourceProperties = dynamicDataSourceProperties;
// 获取所有目标数据源
Map<Object, Object> targetDataSources = new HashMap<>(dynamicDataSourceProvider.loadDataSources());
super.setTargetDataSources(targetDataSources);
// 设置默认数据源
super.setDefaultTargetDataSource(dynamicDataSourceProvider.loadDataSources().get(DynamicDataSourceProvider.DEFAULT_DATASOURCE));
super.afterPropertiesSet();
}
@Override
protected Object determineCurrentLookupKey() {
// return dynamicDataSourceProperties.getDsKey() == null ? DynamicDataSourceProvider.DEFAULT_DATASOURCE:dynamicDataSourceProperties.getDsKey();
return DynamicDataSourceContextHolder.getDataSourceType();
}
}
4. 动态数据源供应DynamicDataSourceProvider.java
package com.common.instance.demo.config.dynamicDataSource;
import javax.sql.DataSource;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description 动态数据源提供者接口
* @date 2023/3/14 15:21
**/
public interface DynamicDataSourceProvider {
// 默认数据源
String DEFAULT_DATASOURCE = "master";
/**
* 加载所有的数据源
* @return
*/
Map<String, DataSource> loadDataSources();
}
package com.common.instance.demo.config.dynamicDataSource;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.Data;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author tcm
* @version 1.0.0
* @description 动态数据源配置
* @date 2023/3/17 9:15
**/
@Data
@Configuration
@EnableConfigurationProperties(DynamicDataSourceProperties.class)
public class YamlDynamicDataSourceProvider implements DynamicDataSourceProvider {
@Resource
private DynamicDataSourceProperties dynamicDataSourceProperties;
@Override
public Map<String, DataSource> loadDataSources() {
Map<String, DataSource> ds = new HashMap<>(dynamicDataSourceProperties.getDs().size());
try {
Map<String, Map<String, String>> map = dynamicDataSourceProperties.getDs();
Set<String> keySet = map.keySet();
for (String s : keySet) {
DruidDataSource dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(map.get(s));
ds.put(s, dynamicDataSourceProperties.dataSource(dataSource));
}
} catch (Exception e) {
e.printStackTrace();
}
return ds;
}
}
5. 动态数据源bean
package com.common.instance.demo.config.dynamicDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import javax.sql.DataSource;
/**
* @author tcm
* @version 1.0.0
* @description 数据源配置
* @date 2023/3/15 17:18
**/
@Configuration
public class DataSourceBean {
@Resource
private DynamicDataSourceProperties dynamicDataSourceProperties;
@Resource
private DynamicDataSourceProvider dynamicDataSourceProvider;
@Bean
public DataSource getDataSource() {
return new DynamicDataSource(dynamicDataSourceProvider, dynamicDataSourceProperties);
}
}
6. 动态数据源上下文DynamicDataSourceContextHolder.java
作用是:InheritableThreadLocal继承类来缓存路由类型(压测或用户流量),异步线程可以继承该缓存。
package com.common.instance.demo.config.dynamicDataSource;
import com.log.util.LogUtil;
/**
* @author tcm
* @version 1.0.0
* @description 动态数据源上下文
* @date 2023/3/15 11:16
**/
public class DynamicDataSourceContextHolder {
/**
* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
*/
private static final ThreadLocal<String> CONTEXT_HOLDER = new InheritableThreadLocal<>();
/**
* 设置数据源的变量
*/
public static void setDataSourceType(String dsType) {
LogUtil.info(String.format("切换到%s数据源", dsType));
CONTEXT_HOLDER.set(dsType);
}
/**
* 获得数据源的变量
*/
public static String getDataSourceType() {
return CONTEXT_HOLDER.get();
}
/**
* 清空数据源变量
*/
public static void clearDataSourceType() {
CONTEXT_HOLDER.remove();
}
}
7. 动态数据源过滤器DynamicDataSourceFilter.java
该类的作用:使用请求头来标记是用户还是压测,若是压测则切换到压测环境Mysql库。
package com.common.instance.demo.config.dynamicDataSource;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* @author tcm
* @version 1.0.0
* @description 动态数据源过滤器
* @date 2023/3/15 15:07
**/
@Component
@WebFilter(filterName = "dynamicDataSourceFilter", urlPatterns = "/*")
@Order(-10)
public class DynamicDataSourceFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
// 获取测试标记
String testFlag = request.getHeader("Test-Flag");
// 如有测试标记,则设置测试数据源
if (testFlag != null ) {
DynamicDataSourceContextHolder.setDataSourceType("test");
}
// 添加到过滤链中
filterChain.doFilter(request, response);
}
}
8. 测试动态路由
// 路由生产mysql
curl --request POST
--url http://127.0.0.1:9013/instance-demo/dynamicDataSource/insertData
--header 'content-type: application/json'
--data '{
"tabId":"fde7c1d4cad049c89612afb6c2c2979",
"transactionId":"7F0000013B2818B4AAC22CE1BDA20004"
}'
// 路由测试mysql
curl --request POST
--url http://127.0.0.1:9013/instance-demo/dynamicDataSource/insertData
--header 'Test-Flag: true'
--header 'content-type: application/json'
--data '{
"tabId":"fde7c1d4cad049c89612afb6c2c2979",
"transactionId":"7F0000013B2818B4AAC22CE1BDA20004-t"
}'
三、动态路由MongoDB
动态路由原理参考Mysql动态路由。自定义抽象路由工厂AbstractRoutingMongoSource,根据key路由到不同的MongoDB数据源。把路由工厂实现类DynamicMongoSource注入到MongoTemplate中,实现路由切换。
1. 参数配置application-localDynamicMongo.yml
spring:
dynamic-mongo-source:
# dsKey: test
ds:
# 主库数据源,默认master不能变
master:
hosts: ${remote.ip}
ports: 27018
database: demo
username: admin
password: 123456
authentication-database: admin
connections-per-host: 100
min-connections-per-host: 1
maxConnectionIdleTime: 150000
maxConnectionLifeTime: 150000
connectTimeout: 6000
socketTimeout: 10000
# 压测库数据源
test:
hosts: ${remote.ip}
ports: 27018
database: test
username: admin
password: 123456
authentication-database: admin
connections-per-host: 100
min-connections-per-host: 1
maxConnectionIdleTime: 150000
maxConnectionLifeTime: 150000
connectTimeout: 6000
socketTimeout: 10000
2. 加载配置参数DynamicMongoSourceProperties.java
package com.common.instance.demo.config.dynamicMongoSource;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description Mongo动态加载数据源配置
* @date 2023/3/17 9:21
**/
@Data
@Component
@ConfigurationProperties(prefix = "spring.dynamic-mongo-source")
public class DynamicMongoSourceProperties {
private Map<String, MongoSettingsProperties> ds;
private String dsKey;
}
package com.common.instance.demo.config.dynamicMongoSource;
import lombok.Data;
import org.hibernate.validator.constraints.NotBlank;
import org.hibernate.validator.constraints.NotEmpty;
import org.springframework.validation.annotation.Validated;
import java.util.List;
/**
* @author tcm
* @version 1.0.0
* @description Mongo配置
* @date 2023/3/17 9:14
**/
@Validated
@Data
public class MongoSettingsProperties {
@NotBlank
private String database;
@NotEmpty
private List<String> hosts;
@NotEmpty
private List<Integer> ports;
private String replicaSet;
private String username;
private String password;
private String authenticationDatabase;
private Integer minConnectionsPerHost = 10;
private Integer connectionsPerHost = 20;
private Integer maxConnectionIdleTime;
private Integer maxConnectionLifeTime;
private Integer connectTimeout;
private Integer socketTimeout;
}
3. 动态数据源DynamicMongoSource.java
自定义抽象路由工厂AbstractRoutingMongoSource,根据key路由到不同的MongoDB数据源。把路由工厂实现类DynamicMongoSource注入到MongoTemplate中,实现路由切换。
package com.common.instance.demo.config.dynamicMongoSource;
import com.common.instance.demo.config.dynamicDataSource.DynamicDataSourceProvider;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description Mongo动态数据源
* @date 2023/3/17 11:22
**/
public class DynamicMongoSource extends AbstractRoutingMongoSource {
private DynamicMongoSourceProvider dynamicMongoSourceProvider;
private DynamicMongoSourceProperties dynamicMongoSourceProperties;
public DynamicMongoSource(DynamicMongoSourceProvider dynamicMongoSourceProvider, DynamicMongoSourceProperties dynamicMongoSourceProperties) {
this.dynamicMongoSourceProvider = dynamicMongoSourceProvider;
this.dynamicMongoSourceProperties = dynamicMongoSourceProperties;
// 获取所有目标数据源
Map<String, MongoDatabaseFactory> targetMongoSources = new HashMap<>(dynamicMongoSourceProvider.loadMongoSources());
super.setTargetMongoSources(targetMongoSources);
// 设置默认数据源
super.setDefaultTargetMongoSource(dynamicMongoSourceProvider.loadMongoSources().get(DynamicDataSourceProvider.DEFAULT_DATASOURCE));
}
@Override
protected Object determineCurrentLookupKey() {
// return dynamicMongoSourceProperties.getDsKey() == null ? DynamicMongoSourceProvider.DEFAULT_DATASOURCE:dynamicMongoSourceProperties.getDsKey();
return DynamicMongoSourceContextHolder.getMongoSourceType();
}
}
package com.common.instance.demo.config.dynamicMongoSource;
import com.mongodb.ClientSessionOptions;
import com.mongodb.client.ClientSession;
import com.mongodb.client.MongoDatabase;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description MongoDB路由抽象类
* @date 2023/3/16 17:39
**/
public abstract class AbstractRoutingMongoSource implements MongoDatabaseFactory {
private Map<String, MongoDatabaseFactory> targetMongoSources;
private MongoDatabaseFactory defaultTargetMongoSource;
public void setTargetMongoSources(Map<String, MongoDatabaseFactory> targetMongoSources) {
this.targetMongoSources = targetMongoSources;
}
public void setDefaultTargetMongoSource(MongoDatabaseFactory defaultTargetMongoSource) {
this.defaultTargetMongoSource = defaultTargetMongoSource;
}
protected MongoDatabaseFactory determineTargetMongoSource() {
if (this.targetMongoSources == null) {
throw new IllegalArgumentException("Property 'targetMongoSources' is required");
}
Object lookupKey = determineCurrentLookupKey();
MongoDatabaseFactory mongoSource = this.targetMongoSources.get(lookupKey);
if (mongoSource == null && lookupKey == null) {
mongoSource = this.defaultTargetMongoSource;
}
if (mongoSource == null) {
throw new IllegalStateException("Cannot determine target MongoTemplate for lookup key [" + lookupKey + "]");
}
return mongoSource;
}
protected abstract Object determineCurrentLookupKey();
@Override
public MongoDatabase getMongoDatabase() throws DataAccessException {
return determineTargetMongoSource().getMongoDatabase();
}
@Override
public MongoDatabase getMongoDatabase(String s) throws DataAccessException {
return determineTargetMongoSource().getMongoDatabase(s);
}
@Override
public PersistenceExceptionTranslator getExceptionTranslator() {
return null;
}
@Override
public ClientSession getSession(ClientSessionOptions clientSessionOptions) {
return determineTargetMongoSource().getSession(clientSessionOptions);
}
@Override
public MongoDatabaseFactory withSession(ClientSession clientSession) {
return determineTargetMongoSource().withSession(clientSession);
}
}
4. 动态数据源供应DynamicMongoSourceProvider.java
package com.common.instance.demo.config.dynamicMongoSource;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description Mongo动态数据源提供者接口
* @date 2023/3/17 9:14
**/
public interface DynamicMongoSourceProvider {
// 默认数据源
String DEFAULT_DATASOURCE = "master";
/**
* 加载所有的数据源
* @return
*/
Map<String, MongoDatabaseFactory> loadMongoSources();
}
package com.common.instance.demo.config.dynamicMongoSource;
import com.mongodb.*;
import com.mongodb.client.MongoClient;
import com.mongodb.client.internal.MongoClientImpl;
import lombok.Data;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author tcm
* @version 1.0.0
* @description Mongo动态数据源配置
* @date 2023/3/17 9:15
**/
@Data
@Configuration
@EnableConfigurationProperties(DynamicMongoSourceProperties.class)
public class YamlDynamicMongoSourceProvider implements DynamicMongoSourceProvider {
@Resource
private DynamicMongoSourceProperties dynamicMongoSourceProperties;
@Override
public Map<String, MongoDatabaseFactory> loadMongoSources() {
Map<String, MongoDatabaseFactory> ds = new HashMap<>(dynamicMongoSourceProperties.getDs().size());
try {
Map<String, MongoSettingsProperties> map = dynamicMongoSourceProperties.getDs();
Set<String> keySet = map.keySet();
for (String s : keySet) {
MongoSettingsProperties mongoSettingsProperties = map.get(s);
ds.put(s, getMongoFactory(mongoSettingsProperties));
}
} catch (Exception e) {
e.printStackTrace();
}
return ds;
}
private MongoDatabaseFactory getMongoFactory(MongoSettingsProperties mongoSettingsProperties) {
// MongoDB地址列表
List<ServerAddress> serverAddresses = new ArrayList<>();
for (String host : mongoSettingsProperties.getHosts()) {
int index = mongoSettingsProperties.getHosts().indexOf(host);
Integer port = mongoSettingsProperties.getPorts().get(index);
ServerAddress serverAddress = new ServerAddress(host, port);
serverAddresses.add(serverAddress);
}
// 连接认证
MongoCredential credential = MongoCredential.createCredential(mongoSettingsProperties.getUsername(),
mongoSettingsProperties.getAuthenticationDatabase(),
mongoSettingsProperties.getPassword().toCharArray());
MongoDriverInformation info = MongoDriverInformation.builder().build();
MongoClientSettings build = MongoClientSettings.builder()
.applyToClusterSettings(builder -> builder.hosts(serverAddresses))
.applyToConnectionPoolSettings(builder -> builder
.maxConnectionIdleTime(mongoSettingsProperties.getMaxConnectionIdleTime(), TimeUnit.MILLISECONDS)
.maxConnectionLifeTime(mongoSettingsProperties.getMaxConnectionLifeTime(), TimeUnit.MILLISECONDS))
.credential(credential).build();
// 创建客户端和Factory
MongoClient mongoClient = new MongoClientImpl(build, info);
return new SimpleMongoClientDatabaseFactory(mongoClient, mongoSettingsProperties.getDatabase());
}
}
5. 动态数据源bean
package com.common.instance.demo.config.dynamicMongoSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import javax.annotation.Resource;
/**
* @author tcm
* @version 1.0.0
* @description Mongo数据源配置
* @date 2023/3/21 10:08
**/
@Configuration
public class MongoSourceBean {
@Resource
private DynamicMongoSourceProperties dynamicMongoSourceProperties;
@Resource
private DynamicMongoSourceProvider dynamicMongoSourceProvider;
@Bean
public MongoTemplate getMongoSource() {
// 动态数据源配置
DynamicMongoSource dynamicMongoSource = new DynamicMongoSource(dynamicMongoSourceProvider, dynamicMongoSourceProperties);
// 插入数据时,去除class字段
DefaultDbRefResolver dbRefResolver = new DefaultDbRefResolver(dynamicMongoSource);
MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, new MongoMappingContext());
converter.setTypeMapper(new DefaultMongoTypeMapper(null));
converter.setMapKeyDotReplacement("。");
return new MongoTemplate(dynamicMongoSource, converter);
}
}
6. 动态数据源上下文DynamicMongoSourceContextHolder.java
package com.common.instance.demo.config.dynamicMongoSource;
import com.log.util.LogUtil;
/**
* @author tcm
* @version 1.0.0
* @description Mongo动态数据源上下文
* @date 2023/3/21 14:16
**/
public class DynamicMongoSourceContextHolder {
/**
* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
*/
private static final ThreadLocal<String> CONTEXT_MONGO_HOLDER = new InheritableThreadLocal<>();
/**
* 设置数据源的变量
*/
public static void setMongoSourceType(String dsType) {
LogUtil.info(String.format("mongo切换到%s数据源", dsType));
CONTEXT_MONGO_HOLDER.set(dsType);
}
/**
* 获得数据源的变量
*/
public static String getMongoSourceType() {
return CONTEXT_MONGO_HOLDER.get();
}
/**
* 清空数据源变量
*/
public static void clearMongoSourceType() {
CONTEXT_MONGO_HOLDER.remove();
}
}
7. 动态数据源过滤器DynamicMongoSourceFilter.java
package com.common.instance.demo.config.dynamicMongoSource;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* @author tcm
* @version 1.0.0
* @description Mongo动态数据源过滤器
* @date 2023/3/21 14:17
**/
@Component
@WebFilter(filterName = "dynamicMongoSourceFilter", urlPatterns = "/*")
@Order(-10)
public class DynamicMongoSourceFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
// 获取测试标记
String testFlag = request.getHeader("Test-Mongo-Flag");
// 如有测试标记,则设置测试数据源
if (testFlag != null ) {
DynamicMongoSourceContextHolder.setMongoSourceType("test");
}
// 添加到过滤链中
filterChain.doFilter(request, response);
}
}
8. 测试动态路由
// 路由生产mongo
curl --request POST
--url http://127.0.0.1:9013/instance-demo/test/mongodb/listAll
// 路由测试mongo
curl --request POST
--url http://127.0.0.1:9013/instance-demo/test/mongodb/listAll
--header 'Test-Mongo-Flag: true'
四、动态路由Redis
动态路由原理参考Mysql动态路由。自定义抽象路由工厂AbstractRoutingRedisSource,根据key路由到不同的redis数据源。把路由工厂实现类DynamicRedisSource注入到RedisTemplate中,实现路由切换。
1. 参数配置application-localDynamicRedis.yml
spring:
dynamic-redis-source:
# dsKey: test
ds:
# 主库数据源,默认master不能变
master:
address: redis://127.0.0.1:6379
database: 1
password: abcdef
# 压测库数据源
test:
address: redis://127.0.0.1:6379
database: 0
password: abcdef
config:
singleServerConfig:
# 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。
idleConnectionTimeout: 10000
pingTimeout: 1000
# 同任何节点建立连接时的等待超时。时间单位是毫秒。
connectTimeout: 10000
# 等待节点回复命令的时间。该时间从命令发送成功时开始计时。
timeout: 3000
# 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。
retryInterval: 1500
# 当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。
reconnectionTimeout: 3000
# 在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。
failedAttempts: 3
# 在Redis节点里显示的客户端名称
clientName: null
# 发布和订阅连接的最小空闲连接数 默认1
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小 默认50
subscriptionConnectionPoolSize: 100
# 单个连接最大订阅数量 默认5
subscriptionsPerConnection: 5
# 最小空闲连接数,默认值:10,最小保持连接数(长连接)
connectionMinimumIdleSize: 12
# 连接池最大容量。默认值:64;连接池的连接数量自动弹性伸缩
connectionPoolSize: 64
# 这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。
# 默认值: 当前处理核数量 * 2
# threads: 4
## 这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。
## 默认值: 当前处理核数量 * 2
# nettyThreads: 0
# 编码 默认值: org.redisson.codec.JsonJacksonCodec Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。
# codec: !<org.redisson.codec.JsonJacksonCodec> { }
# 传输模式 默认值:TransportMode.NIO
transportMode: NIO
2. 加载配置参数DynamicRedisSourceProperties.java
package com.common.instance.demo.config.dynamicRedisSource;
import lombok.Data;
import org.redisson.config.Config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author TCM
* @version 1.0
* @description 动态加载数据源配置
* @date 2023/3/25 12:40
**/
@Data
@Component
@ConfigurationProperties(prefix = "spring.dynamic-redis-source")
public class DynamicRedisSourceProperties {
private Config config;
private Map<String, Map<String, String>> ds;
private String dsKey;
}
3. 动态数据源DynamicRedisSource.java
自定义抽象路由工厂AbstractRoutingRedisSource,根据key路由到不同的redis数据源。把路由工厂实现类DynamicRedisSource注入到RedisTemplate中,实现路由切换。
package com.common.instance.demo.config.dynamicRedisSource;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author TCM
* @version 1.0
* @description Redis动态数据源
* @date 2023/3/25 13:32
**/
public class DynamicRedisSource extends AbstractRoutingRedisSource {
private DynamicRedisSourceProvider dynamicMongoSourceProvider;
private DynamicRedisSourceProperties dynamicRedisSourceProperties;
public DynamicRedisSource(DynamicRedisSourceProvider dynamicMongoSourceProvider, DynamicRedisSourceProperties dynamicRedisSourceProperties) {
try {
this.dynamicMongoSourceProvider = dynamicMongoSourceProvider;
this.dynamicRedisSourceProperties = dynamicRedisSourceProperties;
// 获取所有目标数据源
Map<String, RedissonConnectionFactory> targetRedisSources = new HashMap<>(dynamicMongoSourceProvider.loadRedisSources());
super.setTargetRedisSources(targetRedisSources);
// 设置默认数据源
super.setDefaultTargetRedisSource(dynamicMongoSourceProvider.loadRedisSources().get(DynamicRedisSourceProvider.DEFAULT_DATASOURCE));
super.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
protected Object determineCurrentLookupKey() {
// return dynamicRedisSourceProperties.getDsKey() == null ? DynamicRedisSourceProvider.DEFAULT_DATASOURCE:dynamicRedisSourceProperties.getDsKey();
return DynamicRedisSourceContextHolder.getRedisSourceType();
}
}
package com.common.instance.demo.config.dynamicRedisSource;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisSentinelConnection;
import java.util.Map;
/**
* @author TCM
* @version 1.0
* @description Redis路由抽象类
* @date 2023/3/25 13:18
**/
public abstract class AbstractRoutingRedisSource extends RedissonConnectionFactory {
private Map<String, RedissonConnectionFactory> targetRedisSources;
private RedissonConnectionFactory defaultTargetRedisSource;
public void setTargetRedisSources(Map<String, RedissonConnectionFactory> targetRedisSources) {
this.targetRedisSources = targetRedisSources;
}
public void setDefaultTargetRedisSource(RedissonConnectionFactory defaultTargetRedisSource) {
this.defaultTargetRedisSource = defaultTargetRedisSource;
}
protected RedisConnectionFactory determineTargetRedisSource() {
if (this.defaultTargetRedisSource == null) {
throw new IllegalArgumentException("Property 'defaultTargetRedisSource' is required");
}
Object lookupKey = determineCurrentLookupKey();
RedissonConnectionFactory redisSource = this.targetRedisSources.get(lookupKey);
if (redisSource == null && lookupKey == null) {
redisSource = this.defaultTargetRedisSource;
}
if (redisSource == null) {
throw new IllegalStateException("Cannot determine target RedissonTemplate for lookup key [" + lookupKey + "]");
}
return redisSource;
}
protected abstract Object determineCurrentLookupKey();
@Override
public RedisConnection getConnection() {
return determineTargetRedisSource().getConnection();
}
@Override
public RedisClusterConnection getClusterConnection() {
return determineTargetRedisSource().getClusterConnection();
}
@Override
public RedisSentinelConnection getSentinelConnection() {
return determineTargetRedisSource().getSentinelConnection();
}
}
4. 动态数据源供应DynamicRedisSourceProvider.java
package com.common.instance.demo.config.dynamicRedisSource;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import java.util.Map;
/**
* @author tcm
* @version 1.0.0
* @description Redis动态数据源提供者接口
* @date 2023/3/25 12:51
**/
public interface DynamicRedisSourceProvider {
// 默认数据源
String DEFAULT_DATASOURCE = "master";
/**
* 加载所有的数据源
* @return
*/
Map<String, RedissonConnectionFactory> loadRedisSources();
}
package com.common.instance.demo.config.dynamicRedisSource;
import lombok.Data;
import org.apache.logging.log4j.util.Strings;
import org.redisson.Redisson;
import org.redisson.config.Config;
import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author TCM
* @version 1.0
* @description Redis动态数据源配置
* @date 2023/3/25 12:54
**/
@Data
@Configuration
@EnableConfigurationProperties(DynamicRedisSourceProperties.class)
public class YamlDynamicRedisSourceProvider implements DynamicRedisSourceProvider {
@Resource
private DynamicRedisSourceProperties dynamicRedisSourceProperties;
@Override
public Map<String, RedissonConnectionFactory> loadRedisSources() {
Map<String, RedissonConnectionFactory> ds = new HashMap<>(dynamicRedisSourceProperties.getDs().size());
try {
Map<String, Map<String, String>> map = dynamicRedisSourceProperties.getDs();
Set<String> keySet = map.keySet();
for (String s : keySet) {
ds.put(s, getRedisFactory(s));
}
} catch (Exception e) {
e.printStackTrace();
}
return ds;
}
private RedissonConnectionFactory getRedisFactory(String key) {
Config config = dynamicRedisSourceProperties.getConfig();
config.useSingleServer().setAddress(dynamicRedisSourceProperties.getDs().get(key).get("address"));
config.useSingleServer().setPassword(Strings.EMPTY.equals(dynamicRedisSourceProperties.getDs().get(key).get("password")) ? null:dynamicRedisSourceProperties.getDs().get(key).get("password"));
config.useSingleServer().setDatabase(Integer.parseInt(dynamicRedisSourceProperties.getDs().get(key).get("database")));
return new RedissonConnectionFactory(Redisson.create(config));
}
}
5. 动态数据源bean
package com.common.instance.demo.config.dynamicRedisSource;
import com.aliyun.openservices.shade.com.alibaba.fastjson.parser.ParserConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import javax.annotation.Resource;
/**
* @author TCM
* @version 1.0
* @description Redis数据源配置
* @date 2023/3/25 13:40
**/
@Configuration
public class RedisSourceBean {
@Resource
private DynamicRedisSourceProvider dynamicRedisSourceProvider;
@Resource
private DynamicRedisSourceProperties dynamicRedisSourceProperties;
@Bean("dynamicRedisTemplate")
public RedisTemplate<String, Object> redisTemplate() {
DynamicRedisSource dynamicRedisSource = new DynamicRedisSource(dynamicRedisSourceProvider, dynamicRedisSourceProperties);
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
// 全局开启AutoType,不建议使用
ParserConfig.getGlobalInstance().setAutoTypeSupport(false);
// 建议使用这种方式,小范围指定白名单
ParserConfig.getGlobalInstance().addAccept("com.common");
// value 值的序列化采用 fastJsonRedisSerializer
FastJson2JsonRedisSerializer fastJsonRedisSerializer = new FastJson2JsonRedisSerializer(Object.class);
// key 的序列化采用 StringRedisSerializer
RedisSerializer stringSerializer = new StringRedisSerializer();
template.setKeySerializer(stringSerializer);
template.setHashKeySerializer(stringSerializer);
template.setValueSerializer(fastJsonRedisSerializer);
template.setHashValueSerializer(fastJsonRedisSerializer);
template.setConnectionFactory(dynamicRedisSource);
template.afterPropertiesSet();
return template;
}
}
package com.common.instance.demo.config.dynamicRedisSource;
import com.alibaba.fastjson.JSON;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import java.nio.charset.Charset;
/**
* @description 自定义redis对象序列化
* @author tcm
* @version 1.0.0
* @date 2021/5/26 17:59
**/
public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T> {
private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
private Class<T> clazz;
public FastJson2JsonRedisSerializer(Class<T> clazz) {
super();
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
return JSON.toJSONString(t).getBytes(DEFAULT_CHARSET);
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length <= 0) {
return null;
}
String str = new String(bytes, DEFAULT_CHARSET);
return (T) JSON.parseObject(str, clazz);
}
}
6. 动态数据源上下文DynamicRedisSourceContextHolder.java
package com.common.instance.demo.config.dynamicRedisSource;
import com.log.util.LogUtil;
/**
* @author TCM
* @version 1.0
* @description Redis动态数据源上下文
* @date 2023/3/25 13:37
**/
public class DynamicRedisSourceContextHolder {
/**
* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
*/
private static final ThreadLocal<String> CONTEXT_REDIS_HOLDER = new InheritableThreadLocal<>();
/**
* 设置数据源的变量
*/
public static void setRedisSourceType(String dsType) {
LogUtil.info(String.format("redis切换到%s数据源", dsType));
CONTEXT_REDIS_HOLDER.set(dsType);
}
/**
* 获得数据源的变量
*/
public static String getRedisSourceType() {
return CONTEXT_REDIS_HOLDER.get();
}
/**
* 清空数据源变量
*/
public static void clearRedisSourceType() {
CONTEXT_REDIS_HOLDER.remove();
}
}
7. 动态数据源过滤器DynamicRedisSourceFilter.java
package com.common.instance.demo.config.dynamicRedisSource;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
* @author TCM
* @version 1.0
* @description Redis动态数据源过滤器
* @date 2023/3/26 18:00
**/
@Component
@WebFilter(filterName = "dynamicRedisSourceFilter", urlPatterns = "/*")
@Order(-10)
public class DynamicRedisSourceFilter extends OncePerRequestFilter {
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
// 获取测试标记
String testFlag = request.getHeader("Test-Redis-Flag");
// 如有测试标记,则设置测试数据源
if (testFlag != null ) {
DynamicRedisSourceContextHolder.setRedisSourceType("test");
}
// 添加到过滤链中
filterChain.doFilter(request, response);
}
}
8. 测试动态路由
// 路由生产redis
curl --request GET
--url 'http://localhost:9013/instance-demo/lua/luaScript?keys=sku10&num=78'
// 路由测试redis
curl --request GET
--url 'http://localhost:9013/instance-demo/lua/luaScript?keys=sku10&num=50'
--header 'Test-Redis-Flag: true'