中间件-分库分表路由组件开发
需求背景
数据库路由的需求背景主要来自于业务体量的增加,让原有的技术设计和实现不能承载现有增加的业务规模和体量,因此需要设计分库分表。
核心原理
在进行数据库操作前进行拦截,根据指定的routerKey进行路由计算,取得指定数据源并进行切换。
方案设计
- 数据库连接池配置:分库分表需要按需配置数据库连接源,在这些连接池的集合中进行动态切换操作
- AbstractRoutingDataSource,是用于动态切换数据源的Spring服务类,它提供了获取数据源的抽象方法determineCurrentLookupKey
- 路由计算:在路由计算中需要获取用于分库分表的字段,通过哈希值的计算以及扰动最终到达尽可能的散列,让数据均匀分散到各个库表中,关于散列的计算采用HashMap扰动函数的散列计算方式
HashMap扰动函数散列计算
public void doRouter(String dbKeyAttr) {
// 总划分数:分库数*分表数
int size = dbRouterConfig.getDbCount() * dbRouterConfig.getTbCount();
// 扰动函数;在 JDK 的 HashMap 中,对于一个元素的存放,需要进行哈希散列。而为了让散列更加均匀,所以添加了扰动函数。
int idx = (size - 1) & (dbKeyAttr.hashCode() ^ (dbKeyAttr.hashCode() >>> 16));
// 库表索引;相当于是把一个长条的桶,切割成段,对应分库分表中的库编号和表编号
// 公式目的;8个位置,计算出来的是位置在5 那么你怎么知道5是在2库1表。
int dbIdx = idx / dbRouterConfig.getTbCount() + 1;
int tbIdx = idx - dbRouterConfig.getTbCount() * (dbIdx - 1);
// 将计算的索引结果设置到ThreadLocal
DBContextHolder.setDBKey(String.format("%02d", dbIdx));
DBContextHolder.setTBKey(String.format("%03d", tbIdx));
logger.debug("数据库路由 dbIdx:{} tbIdx:{}", dbIdx, tbIdx);
}
ThreadLocal
用于创建线程局部变量。线程局部变量是指在每个线程中都拥有一份独立的变量副本,各个线程之间互不干扰。每个线程可以独立修改自己的副本,而不会影响其他线程的副本。在本项目用于存储路由计算的结果,后续使用到数据库操作时,可以直接从ThreadLocal中获取对应的数据源信息,而无需在每次调用数据库操作前都进行路由计算,且由于线程间的隔离性,各个线程锁取得的路由信息也都是隔离的,保证了并发性
需要注意的是,ThreadLocal在使用完毕后需要进行清理,也即在完成SQL操作后需要对ThreadLocal进行清理,以防造成内存泄露
/**
* @author Lobox
* @description: 数据源上下文
* @date 2023/8/6 13:51
*/
public class DBContextHolder {
private static final ThreadLocal<String> dbKey = new ThreadLocal<>();
private static final ThreadLocal<String> tbKey = new ThreadLocal<>();
public static void setDBKey(String dbKeyIdx){
dbKey.set(dbKeyIdx);
}
public static String getDBKey(){
return dbKey.get();
}
public static void setTBKey(String tbKeyIdx){
tbKey.set(tbKeyIdx);
}
public static String getTBKey(){
return tbKey.get();
}
public static void clearDBKey() {dbKey.remove();}
public static void clearTBKey() {tbKey.remove();}
}
自定义注解
自定义注解在本项目中的作用主要用于指定路由键以及指定是否开启分表
/**
* @author Lobox
* @description: 路由注解
* @date 2023/8/6 13:43
*/
@Documented // 注解包含到java文档
@Retention(RetentionPolicy.RUNTIME) // 运行时保留,可通过反射读取注解信息
@Target({ElementType.TYPE, ElementType.METHOD}) // 可应用于类、接口、枚举、方法
public @interface DBRouter {
/**
* 分库分表字段
*/
String key() default "";
}
/**
* @author Lobox
* @description: 路由策略,分表标记
* @date 2023/8/6 13:44
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface DBRouterStrategy {
boolean splitTable() default false;
}
从配置文件中读取数据源等信息
程序初始化时需要从配置文件中读取数据库连接信息并创建为数据源,以及获取默认的routerKey等信息
因此需要读取配置文件处理
在配置类中继承EnvironmentAware接口,并实现其public void setEnvironment(Environment environment)方法,该方法在配置类被执行时会自动优先调用
@Configuration
public class DataSourceAutoConfig implements EnvironmentAware {
/**
* 初始化时从配置文件中读取相应的配置,配置类需要实现EnvironmentAware接口并实现setEnvironment方法
* 该方法优先于其它Bean创建方法被调用
* @param environment 环境配置
*/
@Override
public void setEnvironment(Environment environment) {
// 读取配置信息
}
}
相关的配置文件信息如下:
# 多数据源路由配置
mini-db-router:
jdbc:
datasource:
dbCount: 2
tbCount: 4
default: db00
routerKey: uId
list: db01,db02
db00:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery?characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: lobox
password: 123
db01:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery_01?characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: lobox
password: 123
db02:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/lottery_02?characterEncoding=utf-8&serverTimezone=Asia/Shanghai
username: lobox
password: 123
对于简单的配置信息,例如dbCount/tbCount,可以通过如下方式使用environment直接读取
// 配置文件的匹配前缀
String prefix = "mini-db-router.jdbc.datasource.";
dbCount = Integer.valueOf(environment.getProperty(prefix + "dbCount"));
tbCount = Integer.valueOf(environment.getProperty(prefix + "tbCount"));
routerKey = environment.getProperty(prefix + "routerKey");
对于复杂的配置信息,例如数据源的连接信息,除了有多条子信息外,一般还需要将其读取为一个对象,那么便需要通过反射的方式进行读取
// 分库分表数据源
String dataSources = environment.getProperty(prefix + "list");
assert dataSources != null;
for (String dbInfo : dataSources.split(",")) {
Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + dbInfo, Map.class);
dataSourceMap.put(dbInfo, dataSourceProps);
}
/**
* @author Lobox
* @description: 配置属性获取工具类
* @date 2023/8/6 14:00
*/
public class PropertyUtil {
/**
* springboot版本
*/
private static int springBootVersion = 1;
/*
根据是否存在RelaxedPropertyResolver类判定当前springboot版本
*/
static {
try{
Class.forName("org.springframework.boot.bind.RelaxedPropertyResolver");
} catch (ClassNotFoundException e) {
springBootVersion = 2;
}
}
/**
* 根据springBoot版本调用对应的配置文件读取方法
* @param environment 环境上下文
* @param prefix 配置文件前缀
* @param targetClass 绑定后的目标对象类型
*/
@SuppressWarnings("unchecked")
public static <T> T handle(final Environment environment,final String prefix, final Class<T> targetClass) {
switch (springBootVersion) {
case 1:
return (T) v1(environment,prefix);
default:
return (T) v2(environment,prefix,targetClass);
}
}
private static Object v1(final Environment environment, final String prefix) {
try {
Class<?> resolverClass = Class.forName("org.springframework.boot.bind.RelaxedPropertyResolver");
Constructor<?> resolverConstructor = resolverClass.getDeclaredConstructor(PropertyResolver.class);
Method getSubPropertiesMethod = resolverClass.getDeclaredMethod("getSubProperties", String.class);
Object resolverObject = resolverConstructor.newInstance(environment);
String prefixParam = prefix.endsWith(".") ? prefix : prefix + ".";
return getSubPropertiesMethod.invoke(resolverObject, prefixParam);
} catch (final ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException
| IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
private static Object v2(final Environment environment, final String prefix, final Class<?> targetClass) {
try {
Class<?> binderClass = Class.forName("org.springframework.boot.context.properties.bind.Binder");
Method getMethod = binderClass.getDeclaredMethod("get", Environment.class);
Method bindMethod = binderClass.getDeclaredMethod("bind", String.class, Class.class);
Object binderObject = getMethod.invoke(null, environment);
String prefixParam = prefix.endsWith(".") ? prefix.substring(0, prefix.length() - 1) : prefix; // 去除前缀中的'.'
Object bindResultObject = bindMethod.invoke(binderObject, prefixParam, targetClass);
Method resultGetMethod = bindResultObject.getClass().getDeclaredMethod("get");
return resultGetMethod.invoke(bindResultObject);
} catch (final ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
}
v1方法对应SpringBoot1.x版本,v2方法则对应2.x版本,因此这里只对v2方法进行说明
- 首先,通过反射获取
org.springframework.boot.context.properties.bind.Binder
类的引用。 - 使用反射获取
Binder
类的get
方法,该方法可以获取Binder
实例,需要传入一个Environment
参数。 - 使用反射获取
Binder
类的bind
方法,该方法用于绑定配置属性的值,需要传入配置属性的前缀和目标类。 - 使用获取到的
Binder
实例调用bind
方法,将配置属性的前缀和目标类传入,获得一个绑定结果的对象。 - 通过调用绑定结果对象的
get
方法,获取绑定的属性值。 - 最终返回获取到的属性值,其形式为一个Map,有如下key:driver-class-name/url/username/password,它们的value即配置文件中的对应信息
最后将读取到的数据源信息dataSourceProps与对应的数据源名再组成k-v关系传入dataSourceMap中,最后的dataSourceMap内容如下:
key:db00-----value:dataSourceProps00
key:db01-----value:dataSourceProps01
之后遍历dataSourceMap中的连接信息,逐个创建为数据源,存放到targetDataSources中
完整代码
/**
* @author Lobox
* @description: 数据源配置解析
* @date 2023/8/6 14:27
*/
@Configuration
public class DataSourceAutoConfig implements EnvironmentAware {
/**
* 数据源配置
*/
private Map<String,Map<String,Object>> dataSourceMap = new HashMap<>();
/**
* 默认数据源配置
*/
private Map<String,Object> defaultDataSourceConfig;
/**
* 分库数量
*/
private int dbCount;
/**
* 分表数量
*/
private int tbCount;
/**
* 路由字段
*/
private String routerKey;
/**
* 该方法的两个参数会自动从ioc中获取,故可以直接使用
* DBRouterConfig和IDBRouterStrategy的注入会优先于point方法执行
* @param dbRouterConfig
* @param dbRouterStrategy
* @return
*/
@Bean(name = "db-router-point")
@ConditionalOnMissingBean // 存在该bean则不再注入ioc
public DBRouterJoinPoint point(DBRouterConfig dbRouterConfig, IDBRouterStrategy dbRouterStrategy) {
return new DBRouterJoinPoint(dbRouterConfig, dbRouterStrategy);
}
@Bean
public DBRouterConfig dbRouterConfig() {
return new DBRouterConfig(dbCount,tbCount,routerKey);
}
@Bean
public Interceptor plugin() {
return new DynamicMybatisPlugin();
}
/**
* 将反射获取的配置文件中的数据源信息转换为数据源
* @return
*/
@Bean
public DataSource dataSource() {
// 创建数据源
Map<Object, Object> targetDataSources = new HashMap<>();
for (String dbInfo : dataSourceMap.keySet()) {
Map<String, Object> objMap = dataSourceMap.get(dbInfo);
targetDataSources.put(dbInfo, new DriverManagerDataSource(objMap.get("url").toString(), objMap.get("username").toString(), objMap.get("password").toString()));
}
// 设置数据源
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(new DriverManagerDataSource(defaultDataSourceConfig.get("url").toString(), defaultDataSourceConfig.get("username").toString(), defaultDataSourceConfig.get("password").toString()));
return dynamicDataSource;
}
@Bean
public IDBRouterStrategy dbRouterStrategy(DBRouterConfig dbRouterConfig) {
return new DBRouterStrategyHashCode(dbRouterConfig);
}
/**
* 手动指定事务的数据源
* 事务中若出现数据源切换将导致事务失效
* 故事务中需要直接手动指定数据源,而不使用注解
* @param dataSource 数据源
* @return
*/
@Bean
public TransactionTemplate transactionTemplate(DataSource dataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dataSource);
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager(dataSourceTransactionManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return transactionTemplate;
}
/**
* 初始化时从配置文件中读取相应的配置,配置类需要实现EnvironmentAware接口并实现setEnvironment方法
* 该方法优先于其它Bean创建方法被调用
* @param environment 环境配置
*/
@Override
public void setEnvironment(Environment environment) {
// 配置文件的匹配前缀
String prefix = "mini-db-router.jdbc.datasource.";
dbCount = Integer.valueOf(environment.getProperty(prefix + "dbCount"));
tbCount = Integer.valueOf(environment.getProperty(prefix + "tbCount"));
routerKey = environment.getProperty(prefix + "routerKey");
// 分库分表数据源
String dataSources = environment.getProperty(prefix + "list");
assert dataSources != null;
for (String dbInfo : dataSources.split(",")) {
Map<String, Object> dataSourceProps = PropertyUtil.handle(environment, prefix + dbInfo, Map.class);
dataSourceMap.put(dbInfo, dataSourceProps);
}
// 默认数据源
String defaultData = environment.getProperty(prefix + "default");
defaultDataSourceConfig = PropertyUtil.handle(environment, prefix + defaultData, Map.class);
}
}
注解方式分库分表执行-AOP
使用注解方式开启分库分表,需要使用AOP的方式,为对应的Mapper方法创建环绕通知Around,在方法执行前完成路由计算,在方法执行完成后释放ThreadLocal中存储的路由计算结果
/**
* @author Lobox
* @description: 数据路由切面,通过自定义注解的方式,拦截被切面的方法,进行数据库路由
* @date 2023/8/6 14:32
*/
@Aspect
public class DBRouterJoinPoint {
private Logger logger = LoggerFactory.getLogger(DBRouterJoinPoint.class);
private DBRouterConfig dbRouterConfig;
private IDBRouterStrategy dbRouterStrategy;
public DBRouterJoinPoint(DBRouterConfig dbRouterConfig, IDBRouterStrategy dbRouterStrategy) {
this.dbRouterConfig = dbRouterConfig;
this.dbRouterStrategy = dbRouterStrategy;
}
/**
* 创建DBRouter注解为切点
*/
@Pointcut("@annotation(top.alobox.middleware.db.router.annotation.DBRouter)")
public void aopPoint() {
}
/**
* 所有需要分库分表的操作,都需要使用自定义注解进行拦截,拦截后读取方法中的入参字段,根据字段进行路由操作。
* 1. dbRouter.key() 确定根据哪个字段进行路由
* 2. getAttrValue 根据数据库路由字段,从入参中读取出对应的值。比如路由 key 是 uId,那么就从入参对象 Obj 中获取到 uId 的值。
* 3. dbRouterStrategy.doRouter(dbKeyAttr) 路由策略根据具体的路由值进行处理
* 4. 路由处理完成比,就是放行。 jp.proceed();
* 5. 最后 dbRouterStrategy 需要执行 clear 因为这里用到了 ThreadLocal 需要手动清空。关于 ThreadLocal 内存泄漏介绍 https://t.zsxq.com/027QF2fae
*/
@Around("aopPoint() && @annotation(dbRouter)")
public Object doRouter(ProceedingJoinPoint jp, DBRouter dbRouter) throws Throwable {
// 从DBRouter注解中或dbRouterConfig中获取路由字段
// 前者是在注解中指定路由字段,后者使用配置文件中配置的路由字段
String dbKey = dbRouter.key();
if (StringUtils.isBlank(dbKey) && StringUtils.isBlank(dbRouterConfig.getRouterKey())) {
throw new RuntimeException("annotation DBRouter key is null!");
}
dbKey = StringUtils.isNotBlank(dbKey) ? dbKey : dbRouterConfig.getRouterKey();
// 路由属性
String dbKeyAttr = getAttrValue(dbKey, jp.getArgs());
// 路由策略
dbRouterStrategy.doRouter(dbKeyAttr);
// 返回结果
try {
return jp.proceed();
} finally {
dbRouterStrategy.clear();
}
}
private Method getMethod(JoinPoint jp) throws NoSuchMethodException {
Signature sig = jp.getSignature();
MethodSignature methodSignature = (MethodSignature) sig;
return jp.getTarget().getClass().getMethod(methodSignature.getName(), methodSignature.getParameterTypes());
}
public String getAttrValue(String attr, Object[] args) {
if (1 == args.length) {
Object arg = args[0];
if (arg instanceof String) {
return arg.toString();
}
}
String filedValue = null;
for (Object arg : args) {
try {
if (StringUtils.isNotBlank(filedValue)) {
break;
}
// filedValue = BeanUtils.getProperty(arg, attr);
// fix: 使用lombok时,uId这种字段的get方法与idea生成的get方法不同,会导致获取不到属性值,改成反射获取解决
filedValue = String.valueOf(this.getValueByName(arg, attr));
} catch (Exception e) {
logger.error("获取路由属性值失败 attr:{}", attr, e);
}
}
return filedValue;
}
/**
* 获取对象的特定属性值
*
* @author tang
* @param item 对象
* @param name 属性名
* @return 属性值
*/
private Object getValueByName(Object item, String name) {
try {
Field field = getFieldByName(item, name);
if (field == null) {
return null;
}
field.setAccessible(true);
Object o = field.get(item);
field.setAccessible(false);
return o;
} catch (IllegalAccessException e) {
return null;
}
}
/**
* 根据名称获取方法,该方法同时兼顾继承类获取父类的属性
*
* @author tang
* @param item 对象
* @param name 属性名
* @return 该属性对应方法
*/
private Field getFieldByName(Object item, String name) {
try {
Field field;
try {
field = item.getClass().getDeclaredField(name);
} catch (NoSuchFieldException e) {
field = item.getClass().getSuperclass().getDeclaredField(name);
}
return field;
} catch (NoSuchFieldException e) {
return null;
}
}
}
数据源的动态获取
每当需要调用数据库操作时,都会自动调用AbstractRoutingDataSource中的determineCurrentLookupKey来进行数据源的选择,且该类维护了一个TargetDataSources,在上面的读取配置文件操作后,我们将所有的数据源都存储到了TargetDataSource中,因此创建一个AbstractRoutingDataSource的实现类,并重写其determineCurrentLookupKey方法,在方法中从ThreadLocal获取路由计算好的目标数据源进行返回
public class DynamicDataSource extends AbstractRoutingDataSource {
/**
* 每当需要调用数据库操作时,都会自动调用该方法
* @return 对应的路由数据源
*/
@Override
protected Object determineCurrentLookupKey() {
// 从路由上下文中获取doRouter计算好的路由目标数据源
return "db" + DBContextHolder.getDBKey();
}
}
事务中的分库分表
问题:如果一个场景需要在同一个事务下,连续操作不同的DAO操作,那么就会涉及到在 DAO 上使用注解 @DBRouter(key = "uId") 反复切换路由的操作。虽然都是一个数据源,但这样切换后,事务就没法处理了。
解决方案便是在将数据源的操作放在事务操作之前,并且使用编程式事务,以实现精细化处理,如手动回滚等,而无需抛出异常
路由组件中的自定义事务模板代码
@Bean
public TransactionTemplate transactionTemplate(DataSource dataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
dataSourceTransactionManager.setDataSource(dataSource);
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager(dataSourceTransactionManager);
transactionTemplate.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return transactionTemplate;
}
使用范例(下文代码中的dbRouter为路由策略类,已在路由组件的代码中注入到ioc,在完成依赖注入后即可使用并手动路由)
/**
* 路由策略
*/
@Resource
private IDBRouterStrategy dbRouter;
@Override
protected Result grabActivity(PartakeReq partake, ActivityBillVO bill, Long takeId) {
try {
dbRouter.doRouter(partake.getUId());
return transactionTemplate.execute(status -> {
try{
// 扣减个人已参与次数
int updateCount = userTakeActivityRepository.subtractionLeftCount(bill.getActivityId(), bill.getActivityName(), bill.getTakeCount(), bill.getUserTakeLeftCount(), partake.getUId());
if (0 == updateCount) {
status.setRollbackOnly();
logger.error("领取活动,扣减个人已参与次数失败 activityId:{} uId:{}", partake.getActivityId(), partake.getUId());
return Result.buildResult(Constants.ResponseCode.NO_UPDATE);
}
// 写入领取活动记录
userTakeActivityRepository.takeActivity(bill.getActivityId(), bill.getActivityName(), bill.getStrategyId(), bill.getTakeCount(), bill.getUserTakeLeftCount(), partake.getUId(), partake.getPartakeDate(), takeId);
} catch (DuplicateKeyException e) {
status.setRollbackOnly();
logger.error("领取活动,唯一索引冲突 activityId:{} uId:{}", partake.getActivityId(), partake.getUId(), e);
return Result.buildResult(Constants.ResponseCode.INDEX_DUP);
}
return Result.buildSuccessResult();
});
} finally {
dbRouter.clear();
}
}
评论区