若依微服务数据源配置 微服务配置多数据源

您所在的位置:网站首页 微服务2个数据源 若依微服务数据源配置 微服务配置多数据源

若依微服务数据源配置 微服务配置多数据源

2024-07-06 10:13| 来源: 网络整理| 查看: 265

目标:将原有产品(微服务化后台)进行SAAS化改造。

后台主要已使用的相关技术:spring boot, spring cloud(eureka、zuul),shiro,mybatis-plus,qutaz(分布式),kafka, alibaba Druid

SAAS模式:

多租户数据进行数据库schema级别隔离,共用同一套应用。

方案实施:

1、对现有工程在进行数据操作层进行多数据源改造。

2、对工程的应用入口,通过JWT中租户的信息,进行数据源的动态切换,使当前thread均使用同一个数据源,同时有避免频繁的切换。

闲话少叙,我们逐步展开。

第一部分: 对现有微服务工程进行多数据源能力改造。多数据源的定义者@Data public class DynamicDataSource extends AbstractRoutingDataSource { /** * 用于保存租户key和数据源的映射关系,目标数据源map的拷贝 */ private Map defaultDataSources; @Override protected Object determineCurrentLookupKey() { return DbContextHolder.getDbType(); } /** * 动态数据源构造器 * @param defaultDataSource 默认数据源 * @param targetDataSource 目标数据源映射 */ public DynamicDataSource(DataSource defaultDataSource, Map targetDataSource){ defaultDataSources = targetDataSource; super.setDefaultTargetDataSource(defaultDataSource); // 存放数据源的map super.setTargetDataSources(defaultDataSources); // afterPropertiesSet 的作用很重要,它负责解析成可用的目标数据源 super.afterPropertiesSet(); } /** * 添加数据源到目标数据源map中 * @param datasource */ public void addDataSource(String url, String username, String pw, String tcode) { defaultDataSources.remove(tcode); DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(url); druidDataSource.setUsername(username); druidDataSource.setPassword(pw); // 将传入的数据源对象放入动态数据源类的静态map中,然后再讲静态map重新保存进动态数据源中 defaultDataSources.put(tcode, druidDataSource); super.setTargetDataSources(defaultDataSources); super.afterPropertiesSet(); DbContextHolder.setDbType(tcode); } }

主要做了这样几件事情:

构造方法定义默认的数据源(DynamicDataSource(DataSource defaultDataSource, Map targetDataSource)。返回需要使用数据源的key(determineCurrentLookupKey()),该方法为父类方法,重写后,返回数据源map中的key值,从而确定当前要使用的数据源。具体原理参见父类就一目了然(AbstractRoutingDataSource)。

父类代码:

protected DataSource determineTargetDataSource() { Assert.notNull(this.resolvedDataSources, "DataSource router not initialized"); Object lookupKey = determineCurrentLookupKey(); DataSource dataSource = this.resolvedDataSources.get(lookupKey); if (dataSource == null && (this.lenientFallback || lookupKey == null)) { dataSource = this.resolvedDefaultDataSource; } if (dataSource == null) { throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]"); } return dataSource; }将数据源添加到动态数据源的map中(addDataSource(String url, String username, String pw, String tcode))2. 线程缓存当前数据源

定义当前线程缓存,用来保存当前线程所用数据源的key值

public class DbContextHolder { @SuppressWarnings("rawtypes") private static final ThreadLocal contextHolder = new ThreadLocal(); /** * 设置数据源 * @param dbTypeEnum */ @SuppressWarnings("unchecked") public static void setDbType(String dbType) { contextHolder.set(dbType); } /** * 取得当前数据源 * @return */ public static String getDbType() { return (String) contextHolder.get(); } /** * 清除上下文数据 */ public static void clearDbType() { contextHolder.remove(); } }3.多数据源配置

主要做了这样几件事情:

初始化defaultDataSource和dynamicDataSource,并放入bean工厂初始化SqlSessionFactory并放入bean工厂。@Configuration @EnableConfigurationProperties(MybatisPlusProperties.class) public class DdsConfig implements InitializingBean { // springboot配置文件中mybatisplus部分的属性 private final MybatisPlusProperties properties; // spring容器context,主要用于获取bean工厂中的某个bean实例 private final ApplicationContext applicationContext; // 工程自定义的拦截器 private final Interceptor[] interceptors; // springboot配置文件读取用的loader private final ResourceLoader resourceLoader; private final List configurationCustomizers; public DdsConfig(MybatisPlusProperties properties, ApplicationContext applicationContext, ObjectProvider interceptorsProvider, ResourceLoader resourceLoader, List configurationCustomizers) { this.properties = properties; this.applicationContext = applicationContext; this.interceptors = interceptorsProvider.getIfAvailable(); this.resourceLoader = resourceLoader; this.configurationCustomizers = configurationCustomizers; } @Override public void afterPropertiesSet() throws Exception { } /** * 通过properties文件数据源的配置,初始化默认的datasource并放入spring bean工厂 * @param */ @Bean("defaultDataSource") @ConfigurationProperties("spring.datasource.druid") public DataSource defaultDataSource() { DataSource dataSource = DruidDataSourceBuilder.create().build(); return dataSource; } /** * 初始化动态数据源,放入spring bean工厂 * @param */ @Bean("dynamicDataSource") @DependsOn({ "springContext" }) @Primary public DataSource dynamicDataSource() { Map targetDataSources = new HashMap(); DynamicDataSource dynamicDataSource = new DynamicDataSource(defaultDataSource(), targetDataSources); return dynamicDataSource; } @Bean("sqlSessionFactoryBean") public SqlSessionFactory sqlSessionFactoryBean() throws Exception { MybatisSqlSessionFactoryBean sessionFactory = new MybatisSqlSessionFactoryBean(); // 指定mapper的扫描路径 sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mapper/**/*.xml")); //设定动态数据源 sessionFactory.setDataSource(dynamicDataSource()); sessionFactory.setVfs(SpringBootVFS.class); // 设定peoperties文件位置 if (StringUtils.hasText(this.properties.getConfigLocation())) { sessionFactory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation())); } // 设定peoperties文件中定义的属性值 applyConfiguration(sessionFactory); if (this.properties.getConfigurationProperties() != null) { sessionFactory.setConfigurationProperties(this.properties.getConfigurationProperties()); } // 注入拦截器 if (!ObjectUtils.isEmpty(this.interceptors)) { sessionFactory.setPlugins(this.interceptors); } // 注入MetaObjectHandler,如果工程里继承了该类,进行插入、更新操作时候自动填充例如创建时间(者)、更新时间(者)值的时候,一定要注入该类,切记切结! GlobalConfig globalConfig = this.properties.getGlobalConfig(); if (this.applicationContext.getBeanNamesForType(MetaObjectHandler.class, false, false).length > 0) { MetaObjectHandler metaObjectHandler = this.applicationContext.getBean(MetaObjectHandler.class); globalConfig.setMetaObjectHandler(metaObjectHandler); } if (this.applicationContext.getBeanNamesForType(IKeyGenerator.class, false, false).length > 0) { IKeyGenerator keyGenerator = this.applicationContext.getBean(IKeyGenerator.class); globalConfig.getDbConfig().setKeyGenerator(keyGenerator); } sessionFactory.setGlobalConfig(globalConfig); return sessionFactory.getObject(); } private void applyConfiguration(MybatisSqlSessionFactoryBean factory) { MybatisConfiguration configuration = this.properties.getConfiguration(); if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) { configuration = new MybatisConfiguration(); } if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) { for (ConfigurationCustomizer customizer : this.configurationCustomizers) { customizer.customize(configuration); } } factory.setConfiguration(configuration); } @Bean("transactionManager") public PlatformTransactionManager transactionManager() { // 配置事务管理, 使用事务时在方法头部添加@Transactional注解即可 return new DataSourceTransactionManager(dynamicDataSource()); } }第二部分:动态数据源的切换

对工程的应用入口,通过JWT中租户的信息,进行数据源的动态切换,使当前thread均使用同一个数据源,同时有避免频繁的切换。

该部分主要解决了什么时候、从何处获得租户信息、如何进行数据源切换。

改造前工程主要有三个触发新的线程的地方,其解决上述问题的方案也有不同。

1. Resful service,http请求

获取数据源信息并放入线程缓存。

原有系统里使用shiro进行了权限管理,因此:

场景一:login,登录时,根据租户code获取其数据源信息,并将数据源信息保存到创建的JWT,同时进行数据源切换。 定义拦截器,并指定不需要在controller AOP处进行数据源切换。 public class MyAnonymousFilter extends AnonymousFilter { private static final Logger logger = LoggerFactory.getLogger(MyAnonymousFilter.class); @Override protected boolean onPreHandle(ServletRequest request, ServletResponse response, Object mappedValue) { logger.info("Execute Anonymous Filter, set AOP_NEED_SWITCH_DS to false in cache."); ThreadCache.set(ThreadCache.AOP_NEED_SWITCH_DS, false); return true; } }

ShiroConfig部分代码:

@Bean("shiroFilter") public ShiroFilterFactoryBean shirFilter(SecurityManager securityManager) { ShiroFilterFactoryBean shiroFilter = new ShiroFilterFactoryBean(); shiroFilter.setSecurityManager(securityManager); Map filters = new HashMap(); filters.put("jwt", new JWTFilter()); filters.put("myAnon", new MyAnonymousFilter()); shiroFilter.setFilters(filters); Map filterMap = new LinkedHashMap(); filterMap.put("/login", "myAnon"); filterMap.put("/**", "jwt"); shiroFilter.setFilterChainDefinitionMap(filterMap); return shiroFilter; }场景二:其他

第一步:原系统使用shrio进行权限控制,因此在class JwtRealm extends AuthorizingRealm 中,解析JWT,获取租户数据源信息,放入线程缓存。

@Component public class JwtRealm extends AuthorizingRealm { private static final Logger logger = LoggerFactory.getLogger(JwtRealm.class); @Autowired private RedisUtils redisUtils; @Autowired private ServiceConfig serviceConfig; @Override public boolean supports(AuthenticationToken token) { boolean isOk = token instanceof JWTToken; return isOk; } /** * 授权(验证权限时调用) */ @Override protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principals) { AccountContext accountContext = principals.getPrimaryPrincipal(); // 用户权限列表 List allPermission = accountContext.getAllPermissions(); SimpleAuthorizationInfo info = new SimpleAuthorizationInfo(); info.setStringPermissions(new HashSet(allPermission)); return info; } /** * 认证(登录时调用) */ @Override protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken token) throws AuthenticationException { String jwt = (String) token.getPrincipal(); // JWT in redis, key:token, value:accountContext, if not exist in cache, need to // login again AccountContext accountContext = redisUtils.get(jwt); if (accountContext == null) { logger.info("Jwt is not valid and no account context in cache fetched."); throw new MyException(MessageUtils.getMessage(ErrorCode.INVALID_INVOKE)); } if (CommonConst.SAAS_ENABLE) { ThreadCache.set(ThreadCache.AOP_NEED_SWITCH_DS, null); ThreadCache.set(ThreadCache.TENANT_CODE, accountContext.getTcode()); ThreadCache.set(ThreadCache.TENANT_DS_URL, accountContext.getDsURL()); ThreadCache.set(ThreadCache.TENANT_DS_USER, accountContext.getDsUser()); ThreadCache.set(ThreadCache.TENANT_DS_PW, accountContext.getDsPw()); } // do other logic return info; } @Override protected boolean isPermitted(Permission permission, AuthorizationInfo info) { Collection perms = getPermissions(info); if (perms != null && !perms.isEmpty()) { for (Permission perm : perms) { if (perm.equals(permission)) { return true; } } } return false; } }

第二步:定义AOP,创建所有controller的切面,获取当前ThreadLocal中保存的数据源信息(url、username、password),进行动态数据源的切换。

@Aspect @Component class DdsAspect { private static final Logger logger = LoggerFactory.getLogger(DdsAspect.class); @Autowired private DynamicDataSource dynamicDataSource; @Pointcut("execution(public * com.modules..*.controller..*(..)))") public void ControllerAspect() { } @Before("ControllerAspect()") public void beforController() { SetDataSource(); } private void SetDataSource() { // Boolean needDsChange = (Boolean) ThreadCache.get(ThreadCache.AOP_NEED_SWITCH_DS); if(needDsChange!=null && !needDsChange) { return; } String url = (String) ThreadCache.get(ThreadCache.TENANT_DS_URL); String username = (String) ThreadCache.get(ThreadCache.TENANT_DS_USER); String password = (String) ThreadCache.get(ThreadCache.TENANT_DS_PW); String tcode = (String) ThreadCache.get(ThreadCache.TENANT_CODE);dynamicDataSource.addDataSource(url, username, password, tcode); } }2. Kafka的listener

由于系统中集成了Kafka,此处也涉及到改造。

系统模块间的消息通信,消息中都要有JWT,因此消息监听的类里,解析JWT中存储的数据源信息,并进行数据源的切换。

@Component public class MessageSubscriber extends SubscriberBase { private static final Logger logger = LoggerFactory.getLogger(MessagePublisher.class); @Autowired private RedisUtils redisUtils; @Autowired private DynamicDataSource dynamicDataSource; @SuppressWarnings("unchecked") @KafkaListener(topics = "${kafka.topics.t1}") public void processMessage(String content) { MessageDTO messageDTO = JsonUtils.jsonToPojo(content, MessageDTO.class); if(messageDTO != null) { String token = messageDTO.getToken(); // AccountContext为自定义结构化当前登录用户信息,可自行设计 AccountContext accountContext = redisUtils.get(token); ThreadCache.set(ThreadCache.JWT_TEMP, token); if(tcode!=null && CommonConst.SAAS_ENABLE) { String tcode = accountContext.getTcode(); logger.info("Set tcode of current thread: " + tcode); String url = accountContext.getDsURL(); String username = accountContext.getDsUser(); String password = accountContext.getDsPw(); dynamicDataSource.addDataSource(url, username, password, tcode); } else if (tcode == null && CommonConst.SAAS_ENABLE){ logger.info("There is no tcode sent in message in saas mode, do nothing!"); return; } // process message as per need } else { logger.warn("Receive empty message"); } } }3. 分布式schedule job

原系统中使用了quartz作为分布式定时任务的解决方案,因此改造比较容易。只需要在继承成QuartzJobBean的类中,根据租户编码切换数据源即可。

此处不赘述了。

 

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3