Sharding

您所在的位置:网站首页 mod源码 Sharding

Sharding

2023-08-30 14:13| 来源: 网络整理| 查看: 265

Sharding-Jdbc 源码分析

Sharding-JDBC 是 ShardingSphere 开源的分布式数据库中间件产品之一,提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如Java同构、异构语言、云原生等各种多样化的应用场景。 Sharding-JDBC 在 Java 的 JDBC 层提供额外服务,它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架

Apache Sharding-Jdbc分析

JDBC 调用过程如下:sql解析 -> sql路由 -> sql改写 -> sql执行 -> sql归并。

首先看一下shardingsphere-jdbc的包的整体架构:

shardingsphere-jdbc ├── shardingsphere-jdbc-core ├── shardingsphere-jdbc-governance ├── shardingsphere-jdbc-spring │ ├── shardingsphere-jdbc-core-spring │ ├── shardingsphere-jdbc-governance-spring │ ├── shardingsphere-jdbc-spring-infra │ └── shardingsphere-jdbc-transaction-spring JDBC重要的四大对象

在JDBC中常用的类有:

DriverManager;(注册驱动类,调用该类就执行静态代码块的内容,自己注册)

Connection;数据库连接, 可以得到statement,preparedStatement:java.sql.类

Statement;Statement接口代表静态SQL语句,它用于使用Java程序创建和执行通用SQL语句

 ResultSet。结果集(ResultSet)是数据中查询结果返回的一种对象,可以说结果集是一个存储查询结果的对象,但是结果集并不仅仅具有存储的功能,他同时还具有操纵数据的功能,可能完成对数据的更新等。

shardingsphere-jdbc-core中含有

ShardingSphereDataSource ShardingSphereConnection ShardingSphereResultSet ShardingSphereStatement

四个重写的JDBC对象其中三个对象都继承了WrapperAdapter类,所以我们先对WrapperAdapter进行一些讨论,

bca8d41ca1b57864452cbf5a2122c36e.png

首先,我们看到这里有一个 JdbcObject 接口,这个接口泛指 JDBC API 中的 DataSource、Connection、Statement 等核心接口。前面提到,这些接口都继承自包装器 Wrapper 接口。ShardingSphere 为这个 Wrapper 接口提供了一个实现类 WrapperAdapter,这点在图中得到了展示。在 ShardingSphere 代码工程 sharding-jdbc-core 的 org.apache.shardingsphere.shardingjdbc.jdbc.adapter 包中包含了所有与 Adapter 相关的实现类

WrapperAdapter /** * record method invocation. * * @param targetClass target class * @param methodName method name * @param argumentTypes argument types * @param arguments arguments */ @SneakyThrows(ReflectiveOperationException.class) public final void recordMethodInvocation(final Class targetClass, final String methodName, final Class[] argumentTypes, final Object[] arguments) { jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments)); } /** * Replay methods invocation. * * @param target target object */ public final void replayMethodsInvocation(final Object target) { jdbcMethodInvocations.forEach(each -> each.invoke(target)); }

WrapperAdapter的实现方法中包含recordMethodInvocation和replayMethodsInvocation,其中都调用了JdbcMethodInvocation ,JdbcMethodInvocation 类中用到了反射技术根据传入的 method 和 arguments 对象执行对应方法。

了解了 JdbcMethodInvocation 类的原理后,我们就不难理解 recordMethodInvocation 和 replayMethodsInvocation 方法的作用。其中,recordMethodInvocation 用于记录需要执行的方法和参数,而 replayMethodsInvocation 则根据这些方法和参数通过反射技术进行执行。

对于执行 replayMethodsInvocation,我们必须先找到 recordMethodInvocation 的调用入口。通过代码的调用关系,可以看到在 AbstractConnectionAdapter 中对其进行了调用,具体来说就是 setAutoCommit、setReadOnly 和 setTransactionIsolation 这三处方法。这里以 setReadOnly 方法为例给出它的实现:

@Override public final void setReadOnly(final boolean readOnly) throws SQLException { this.readOnly = readOnly; recordMethodInvocation(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly}); forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly)); }

另一方面,从类层关系上,可以看到 AbstractConnectionAdapter 直接继承的是 AbstractUnsupportedOperationConnection 而不是 WrapperAdapter,而在 AbstractUnsupportedOperationConnection 中都是一组直接抛出异常的方法。 代码如下

@Override public final CallableStatement prepareCall(final String sql) throws SQLException { throw new SQLFeatureNotSupportedException("prepareCall"); }

基本上AbstractUnsupportedOperationConnection这个类就是用于校验以及明确子类AbstractConnectionAdapter的操作权限的。属于职责分离的设计模式,非常不错。

ShardingSphereDataSource

首先我们来看下ShardingSphereDataSource这个类

public final class ShardingSphereDataSourceFactory { /** * Create ShardingSphere data source. */ public static DataSource createDataSource(final Map dataSourceMap, final Collection configurations, final Properties props) throws SQLException { return new ShardingSphereDataSource(dataSourceMap, configurations, props); } public static DataSource createDataSource(final DataSource dataSource, final Collection configurations, final Properties props) throws SQLException { Map dataSourceMap = new HashMap(1, 1); dataSourceMap.put(DefaultSchema.LOGIC_NAME, dataSource); return createDataSource(dataSourceMap, configurations, props); }

ShardingSphereDataSourceFactory里面实现了两个初始化dataSource的方法,实际上就是通过 Map dataSourceMap, final Collection configurations, final Properties props 这三个参数初始化ShardingSphereDataSource

而ShardingSphereDataSource 中引用了两个上下文的配置接口

private final MetaDataContexts metaDataContexts; private final TransactionContexts transactionContexts;

首先来看一下metaDataContexts的实现类

metaDataContexts分别在governance(治理)infra(基础)有两个实现 屏幕快照 2021-08-29 下午12.25.35.png

在infra包中实现了StandardMetaDataContexts这个基础的元数据配置上下文,引入ExecutorEngine CalciteContextFactory等非常重要的配置以及引擎方法。

@Getter public final class StandardMetaDataContexts implements MetaDataContexts { private final Map metaDataMap; private final ExecutorEngine executorEngine; private final CalciteContextFactory calciteContextFactory; private final Authentication authentication; private final ConfigurationProperties props; private final LockContext lockContext; private final StateContext stateContext; public StandardMetaDataContexts() { this(new LinkedHashMap(), null, new DefaultAuthentication(), new ConfigurationProperties(new Properties())); } public StandardMetaDataContexts(final Map metaDataMap, final ExecutorEngine executorEngine, final Authentication authentication, final ConfigurationProperties props) { this.metaDataMap = new LinkedHashMap(metaDataMap); this.executorEngine = executorEngine; calciteContextFactory = new CalciteContextFactory(metaDataMap); this.authentication = AuthenticationEngine.findSPIAuthentication().orElse(authentication); this.props = props; lockContext = new StandardLockContext(); stateContext = new StateContext(); }

还有TransactionContext 这个配置类

/** * Transaction contexts. */ public interface TransactionContexts extends AutoCloseable { /** * Get transaction manager engines. * * @return transaction manager engines */ Map getEngines(); /** * Get default transaction manager engine. * * @return default transaction manager engine */ ShardingTransactionManagerEngine getDefaultTransactionManagerEngine(); }

然后我们先回到sharding-jdbc这个包的内容,infra和governance的部分等之后再进行分析。 启动ShardingSphereDataSourceFactoryTest测试类 在创建DataSource的地方打上断点看一下 屏幕快照 2021-08-29 下午12.52.34.png createDatasource的方法调用了metaDataContexts引用上下文,传入了测试数据中的

屏幕快照 2021-08-29 下午12.56.56.png

private ShardingRuleConfiguration createShardingRuleConfiguration() { ShardingRuleConfiguration result = new ShardingRuleConfiguration(); result.getTables().add(new ShardingTableRuleConfiguration("logicTable", "logic_db.table_${0..2}")); return result; }

这个测试类中生成了shadingTable的规则配置,在createDataSource方法中注入了引用上下文,完成了dataSource的初始化。

在同一边还有YamlShardingSphereDataSourceFactory类,其实现就是先通过yml规则引擎转译yml文件,然后在通过ShardingSphereDataSourceFactory.createDatasource来启动

public static DataSource createDataSource(final File yamlFile) throws SQLException, IOException { YamlRootRuleConfigurations configurations = YamlEngine.unmarshal(yamlFile, YamlRootRuleConfigurations.class); return ShardingSphereDataSourceFactory.createDataSource(DATASOURCE_SWAPPER.swapToDataSources(configurations.getDataSources()), SWAPPER_ENGINE.swapToRuleConfigurations(configurations.getRules()), configurations.getProps()); } ShardingSphereConnection ShardingSphereConnection extends AbstractConnectionAdapter implements ExecutorJDBCManager

ShardingSphereConnection继承了AbstractConnectionAdapter类,实现了ExecutorJDBCManager的接口,最底层的类是java.sql.Connection

ShardingSphere通过AbstractConnectionAdapter包装了Connection的实现类, Adapter类上文已经有部分讨论,这里就不予多说了。

运行ShardingSphereConnectionTest测试类,我们来看一下ShardingSphereConnection的运行过程 执行方法以后调用了getConnection()

@Test public void assertGetConnectionFromCache() throws SQLException { assertThat(connection.getConnection("test_primary_ds"), is(connection.getConnection("test_primary_ds"))); }

然后调用到

public Connection getConnection(final String dataSourceName) throws SQLException { return getConnections(dataSourceName, 1, ConnectionMode.MEMORY_STRICTLY).get(0); } @Override public List getConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException { DataSource dataSource = dataSourceMap.get(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName); Collection connections; synchronized (getCachedConnections()) { connections = getCachedConnections().get(dataSourceName); }

屏幕快照 2021-08-29 下午3.11.09.png 我们可以看到断点中的数据包含dataSourceName是传入的test_primary_ds和test_replica_ds

因为test程序在初始化的时候初始了原表和复制两个表

@BeforeClass public static void init() throws SQLException { DataSource primaryDataSource = mockDataSource(); DataSource replicaDataSource = mockDataSource(); dataSourceMap = new HashMap(2, 1); dataSourceMap.put("test_primary_ds", primaryDataSource); dataSourceMap.put("test_replica_ds", replicaDataSource); }

ShardingConnection 直接继承了AbstractConnectionAdapter。在 AbstractConnectionAdapter 中发现了一个 cachedConnections 属性,它是一个 Map 对象。

@Getter private final Multimap cachedConnections = LinkedHashMultimap.create();

该对象其实缓存了这个经过封装的 ShardingConnection 背后真实的 Connection 对象。如果我们对一个 AbstractConnectionAdapter 重复使用,那么这些 cachedConnections 也会一直被缓存,直到调用 close 方法

所以在getConnection中获得了缓存的两个数据源的数据。在实际使用的过程中也是靠这个类去获得所用的所有数据源的,从而能达到分库的情况下一个语句查询多个库表的能力。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3