手把手教你使用Kettle JAVA API进行数据抽取

您所在的位置:网站首页 java调用kettle文件 手把手教你使用Kettle JAVA API进行数据抽取

手把手教你使用Kettle JAVA API进行数据抽取

#手把手教你使用Kettle JAVA API进行数据抽取| 来源: 网络整理| 查看: 265

原文转自:轻样知生 - 手把手教你使用Kettle JAVA API进行数据抽取 - Tylili

Kettle作为一款优秀的数据抽取程序,因为高效稳定的性能,一直被广大使用者所喜爱,并且还在国内广受好评。因为其本身使用纯JAVA编写,所以其JAVA API使用起来自然也是非常简便。虽然其本身自带的组件已经非常好用,并且能够满足丰富的场景。但可能有些场景下,我们可能需要通过其他的方式来实现,本篇我们将介绍Kettle的JAVA API的使用。

一、环境搭建

Pentaho官方仓库:https://nexus.pentaho.org/content/groups/omni

核心jar包的pom.xml配置如下:

pentaho-kettle kettle-engine 4.4.0-stable pentaho-kettle kettle-core 4.4.0-stable pentaho-kettle kettle-db 4.4.0-stable

 

二、代码部分 1、初始化环境 public void initKettleEnvironment(HttpServletRequest request) throws KettleException { if (KettleEnvironment.isInitialized()) { return; } /** * 为避免在部分网络环境中无法完成初始化,需要自行处理 */ if (request == null) { // 运行环境初始化 KettleEnvironment.init(); } else { String userDir = System.getProperty("user.dir"); String kettleHome = request.getSession().getServletContext().getRealPath(File.separator "WEB-INF"); // 设置用户路径和系统环境,包括用户路径和主目录 System.setProperty("user.dir", kettleHome); System.setProperty("KETTLE_HOME", kettleHome); // 运行环境初始化 KettleEnvironment.init(); // 避免造成影响其他程序的运行,还原用户路径 System.setProperty("user.dir", userDir); } } 2、创建转化元

添加配置数组,配置转化元

public TransMeta buildTransMeta(String metaName, String... transXML) throws KettleXMLException { TransMeta transMeta = new TransMeta(); // 设置转化元的名称 transMeta.setName(metaName); // 添加转换的数据库连接 for (int i = 0; i < transXML.length; i ) { transMeta.addDatabase(new DatabaseMeta(transXML[i])); } return transMeta; } 3、添加日志(可选操作) public void setStepLogTable(TransMeta transMeta, String connDbName, String tableName) { VariableSpace space = new Variables(); // 将step日志数据库配置名加入到变量集中 space.setVariable(Const.KETTLE_TRANS_LOG_DB, connDbName); space.initializeVariablesFrom(null); StepLogTable stepLogTable = StepLogTable.getDefault(space, transMeta); // 配置StepLogTable使用的数据库配置名称 stepLogTable.setConnectionName(connDbName); // 设置Step日志的表名 stepLogTable.setTableName(tableName); // 设置TransMeta的StepLogTable transMeta.setStepLogTable(stepLogTable); } 4、创建插件注册器 public PluginRegistry getRegistry() { // 插件注册,用于注册转换中需要用到的插件 return PluginRegistry.getInstance(); } 5、设置表输入步骤元

该步骤用于获取源数据

/** * 设置表输入步骤 * @param transMeta * @param registry * @param sourceDbName * @param sql * @param stepName * @return */ public StepMeta setTableInputStep(TransMeta transMeta, PluginRegistry registry, String sourceDbName, String sql, String stepName) { // 创建表输入 TableInputMeta tableInputMeta = new TableInputMeta(); String pluginId = registry.getPluginId(StepPluginType.class, tableInputMeta); // 指定数据源数据库配置名 DatabaseMeta source = transMeta.findDatabase(sourceDbName); tableInputMeta.setDatabaseMeta(source); tableInputMeta.setSQL(sql); // 将表输入添加到转换中 StepMeta stepMeta = new StepMeta(pluginId, stepName, tableInputMeta); // 给步骤添加在spoon工具中的显示位置 stepMeta.setDraw(true); stepMeta.setLocation(100, 100); // 将表输入添加到步骤中 transMeta.addStep(stepMeta); return stepMeta; } 6、更新步骤元

该步骤用于将获取到的数据更新到目标数据库中

/** * 设置表输出步骤,用于整表抽取 * @param transMeta * @param registry * @param targetDbName * @param targetTableName * @param stepName * @return */ public StepMeta setTableOutput(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String stepName) { // 创建表输出 TableOutputMeta tableOutputMeta = new TableOutputMeta(); String pluginId = registry.getPluginId(StepPluginType.class, tableOutputMeta); // 配置表输出的目标数据库配置名 DatabaseMeta targetDb = transMeta.findDatabase(targetDbName); tableOutputMeta.setDatabaseMeta(targetDb); tableOutputMeta.setTableName(targetTableName); // 将表输出添加到转换中 StepMeta stepMeta = new StepMeta(pluginId, stepName, tableOutputMeta); transMeta.addStep(stepMeta); return stepMeta; } /** * 设置表插入与更新步骤,用于表中部分字段更新 * @param transMeta * @param registry * @param targetDbName * @param targetTableName * @param updatelookup lookup检索字段 * @param updateStream lookup更新字段 * @param updateStream2 lookup更新字段2 * @param conditions lookup条件 * @param updateOrNot lookup更新标记 * @param stepName * @return */ public StepMeta setInsertUpdateMeta(TransMeta transMeta, PluginRegistry registry, String targetDbName, String targetTableName, String[] updatelookup, String[] updateStream, String[] updateStream2, String[] conditions, Boolean[] updateOrNot, String stepName) { // 创建插入与更新 InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta(); String pluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta); // 配置目标数据库配置名 DatabaseMeta database_target = transMeta.findDatabase(targetDbName); insertUpdateMeta.setDatabaseMeta(database_target); // 设置目标表名 insertUpdateMeta.setTableName(targetTableName); // 设置用来查询的关键字 insertUpdateMeta.setKeyLookup(updatelookup); insertUpdateMeta.setKeyStream(updateStream); insertUpdateMeta.setKeyStream2(updateStream2);// 这一步不能省略 insertUpdateMeta.setKeyCondition(conditions); // 设置要更新的字段 insertUpdateMeta.setUpdateLookup(updatelookup); insertUpdateMeta.setUpdateStream(updateStream); insertUpdateMeta.setUpdate(updateOrNot); // 添加步骤到转换中 StepMeta stepMeta = new StepMeta(pluginId, stepName, insertUpdateMeta); stepMeta.setDraw(true); stepMeta.setLocation(250, 100); transMeta.addStep(stepMeta); return stepMeta; } 7、绑定关联步骤

该步骤用于将数据获取和导入更新的步骤关联绑定

/** * 用于将表输入步骤与第二步骤绑定 * @param transMeta * @param from * @param to */ public void addTransHop(TransMeta transMeta, StepMeta from, StepMeta to) { transMeta.addTransHop(new TransHopMeta(from, to)); } 8、执行抽取

执行数据抽取

/** * 执行抽取 * @param transMeta * @param targetDbName */ public void executeTrans(TransMeta transMeta, String targetDbName) { try { Database database = new Database(null, transMeta.findDatabase(targetDbName)); database.connect(); Trans trans = new Trans(transMeta); trans.execute(new String[] { "start..." }); trans.waitUntilFinished(); // 关闭数据库连接 database.disconnect(); if (trans.getErrors() > 0) { throw new RuntimeException("There were errors during transformation execution."); } } catch (KettleDatabaseException e) { e.printStackTrace(); } catch (KettleException e) { e.printStackTrace(); } } 9、抽取示例

数据库配置xml:

    smy     127.0.0.1     Mysql     Native     test_db     3306     root     123456                           USE_POOLING             Y                               EXTRA_OPTION_MYSQL.characterEncoding             utf8                               EXTRA_OPTION_MYSQL.defaultFetchSize             500                

运行函数:

public static void main(String[] args) { try { KettleClient client = new KettleClient(); client.initEnvironment(null); String transXML = ""; // 此处为上例的数据库配置 TransMeta meta = client.buildTransMeta("kettle", transXML); PluginRegistry registry = client.getRegistry(); StepMeta step1 = client.setTableInputStep(meta, registry, "kettle", "select * from test1", "table input"); StepMeta step2 = client.setTableOutput(meta, registry, "kettle", "test2", "table insert"); client.addTransHop(meta, step1, step2); client.executeTrans(meta, "kettle"); } catch (KettleException e) { e.printStackTrace(); } }

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3