java springboot生成kettle可执行ktr文件表输入/表输出/文本输入/javascript处理/kafka输出

您所在的位置:网站首页 kettle调用java java springboot生成kettle可执行ktr文件表输入/表输出/文本输入/javascript处理/kafka输出

java springboot生成kettle可执行ktr文件表输入/表输出/文本输入/javascript处理/kafka输出

2023-12-03 21:10| 来源: 网络整理| 查看: 265

不谦虚的说 这篇文章, 在csdn界, 填补了一项空白

java生成ktr文件实现表格输入输出,文本文件输入, javascript处理以及kafka输出 不谦虚的说 这篇文章, 在csdn界, 填补了一项空白源码前言1 开始之前需要做的事1.1 将本地jar文件生成maven仓库1.1.1 必须处理的jar文件1.1.2 可选的kafka插件jar文件 2 目录结构以及pom文件2.1 目录结构2.2 pom文件 3 数据库表输入到数据库表的插入更新3.1 java文件 4 文本文件输入 kafka输出 javascript处理5 总结6 灵感来源6.1 TextFileInputMeta6.2 KafkaProducerMeta6.3 ScriptValuesMetaMod

源码

话不多说, 先把源码放在这! https://github.com/rangewr/springboot_kettle

前言

最近有一个需求, 需要让用户通过用户勾选的方式, 选择数据处理的方式, 处理方式也是比较简单, 比如冒号分割, 文本替换等, 这些通过简单的js处理就可以, 但是用户是不懂js代码的, 只会在页面勾选处理方式, 那么就需要在java后台处理了 看到网上有很多kettle的web处理工具, 不适合我这个需求, 首先是项目太大了, 部署运行都需要时间, 而且部署完成之后, 用户用的需求就那么一两个, 那不就浪费资源了嘛, 然后又开始查资料, 能不能把ktr文件自己配置好, 然后让java去调用呢? 答案是可以的, 但是如果用户说, 我不想用冒号分割了, 我想用感叹号分割, 那我配置的ktr文件就需要修改, 能不能动态的生成js代码呢? 可以! 开始搜索java生成ktr, 网上千篇一律的都是在抄一个从数据库表获取数据, 写到另一个数据库表中去, 然后这些山寨版本的文章, 有几个还是不能运行的, 真是奇葩, 我想搜一下从文本文件输入的…然而一点都没有, 不知道为啥, 在这领域的文章, 我翻来覆去都没有找到相关的介绍, 我需要实现的是从文本文件获取数据, 经过javascript处理之后, 将数据传到kafka中, 然后logstash对kafka进行监听, 最终导入到elasticsearch中去, 然后我在文本文件获取中, 就碰壁了, 网上搜了一遍之后, 没有发现有什么有用的, 不过找到了一个从CSV文件获取数据, 这个好像差不多, 但是细节上还是不太对, 算了自己搞!

1 开始之前需要做的事

首先开始敲代码之前, 需要创建一个springboot项目, 因为需要maven引入很多jar包, 然后还需要自己配置几个maven的jar包, 都是需要先期操作的

1.1 将本地jar文件生成maven仓库

这里有3个jar文件是必须生成maven仓库的, 还有1个是kafka插件需要用的, 如果你的需求中不需要使用kafka, 你可以不处理这个jar文件(如果需要使用kafka插件的话, 这里假设你在kettle工具上已经可以使用这个插件了, 因为这个插件是需要自己配置的, 并不是kettle自带的, 这里假设你已经配置好了, 如果没有配置好, 那先暂停一会, 去把kafka插件配置到kettle中, 网上相关的文章很多)

1.1.1 必须处理的jar文件

首先在kettle的安装路径下lib文件夹中找到如下文件

kettle-core-8.2.0.0-342.jar kettle-engine-8.2.0.0-342.jar metastore-8.2.0.0-342.jar

在这里插入图片描述

将这三个文件移到项目目录下src/main/resources/lib中 打开cmd命令窗口, 进入到项目本目录下, 执行如下命令

mvn install:install-file -Dfile=src/main/resources/lib/kettle-core-8.2.0.0-342.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=8.2.0.0-342 -Dpackaging=jar mvn install:install-file -Dfile=src/main/resources/lib/kettle-engine-8.2.0.0-342.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=8.2.0.0-342 -Dpackaging=jar mvn install:install-file -Dfile=src/main/resources/lib/metastore-8.2.0.0-342.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=8.2.0.0-342 -Dpackaging=jar

执行完上述命令, 在本地的maven仓库, 就可以看到这几个文件了, 之后再在项目的pom文件中引用就可以正常用了

1.1.2 可选的kafka插件jar文件

kafka插件因为不是kettle自带的插件, 所以需要把jar包生成到仓库中去, 在kafka插件的安装路径下找到以下文件, 一般在kettle安装路径下的plugins\steps\pentaho-kafka-producer\中

pentaho-kafka-producer.jar

执行如下命令, 将jar文件生成到maven中

mvn install:install-file -Dfile=src/main/resources/lib/pentaho-kafka-producer.jar -DgroupId=pentaho-kettle -DartifactId=kafka-producer -Dversion=8.2.0.0.342 -Dpackaging=jar

至此, 前期准备工作就完成了 接下来直接上代码

2 目录结构以及pom文件 2.1 目录结构

在这里插入图片描述 TranSDemo.java文件是从表格输入, 到表格输出的处理文件 FileToKafka.java文件是从文件输入, 经过js处理, 最终导出到kafka的处理文件

2.2 pom文件 4.0.0 org.springframework.boot spring-boot-starter-parent 2.2.7.RELEASE com.auto_generat kettle 0.0.1 auto_generat_ktr auto_generat_ktr project for Spring Boot 1.8 org.springframework.boot spring-boot-starter-thymeleaf org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine pentaho-kettle kettle-core 8.2.0.0-342 pentaho-kettle kettle-engine 8.2.0.0-342 pentaho-kettle metastore 8.2.0.0-342 pentaho-kettle kafka-producer 8.2.0.0.342 commons-io commons-io 2.6 org.apache.commons commons-vfs2 2.6.0 com.google.guava guava 28.2-jre org.apache.storm storm-core 2.1.0 ${provided.scope} org.owasp.encoder encoder 1.2.2 mysql mysql-connector-java 5.1.49 org.scannotation scannotation 1.0.3 org.mozilla javascript 1.7.2 org.springframework.boot spring-boot-maven-plugin auto_generat_ktr 3 数据库表输入到数据库表的插入更新

话不多说, 直接上代码

3.1 java文件 /* * @Author: wangran * @Date: 2020-06-09 13:00:47 * @LastEditors: wangran * @LastEditTime: 2020-06-10 14:22:21 */ package com.auto_generat.kettle.generat; import java.io.File; import org.apache.commons.io.FileUtils; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.plugins.PluginRegistry; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; public class TransDemo { public static TransDemo transDemo; /** * 两个库中的表名 */ public static String bjdt_tablename = "test_table"; public static String kettle_tablename = "new_table"; /** * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml) */ public static final String[] databasesXML = { "" + "" + "bjdt" + "127.0.0.1" + "MYSQL" + "Native" + "range" + "3306" + "root" + "root" + "", "" + "" + "kettle" + "127.0.0.1" + "MYSQL" + "Native" + "range" + "3306" + "root" + "root" + "" }; /** * @param args */ public static void main(String[] args) { try { KettleEnvironment.init(); transDemo = new TransDemo(); TransMeta transMeta = transDemo.generateMyOwnTrans(); String transXml = " \n" + transMeta.getXML(); // System.out.println("transXml:"+transXml); String transName = "C:\\Users\\Administrator\\Desktop\\deskTopFolder\\copy-one-table.ktr"; File file = new File(transName); FileUtils.writeStringToFile(file, transXml, "UTF-8"); // System.out.println(databasesXML.length+"\n"+databasesXML[0]+"\n"+databasesXML[1]); } catch (Exception e) { e.printStackTrace(); return; } } /** * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作 * * @return * @throws KettleXMLException */ public TransMeta generateMyOwnTrans() throws KettleXMLException { System.out.println("************start to generate my own transformation***********"); TransMeta transMeta = new TransMeta(); // 设置转化的名称 transMeta.setName("insert_update"); // 添加转换的数据库连接 for (int i = 0; i "ID" }); insertUpdateMeta.setKeyStream(new String[] { "ID" }); insertUpdateMeta.setKeyStream2(new String[] { "" });// 一定要加上 insertUpdateMeta.setKeyCondition(new String[] { "=" }); // 设置要更新的字段 String[] updatelookup = { "ID", "NAME", "CODE" }; String[] updateStream = { "ID", "NAME", "CODE" }; Boolean[] updateOrNot = { false, true, true }; insertUpdateMeta.setUpdateLookup(updatelookup); insertUpdateMeta.setUpdateStream(updateStream); insertUpdateMeta.setUpdate(updateOrNot); String[] lookup = insertUpdateMeta.getUpdateLookup(); System.out.println("******:" + lookup[1]); // 添加步骤到转换中 StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update", insertUpdateMeta); insertUpdateStep.setDraw(true); insertUpdateStep.setLocation(250, 100); transMeta.addStep(insertUpdateStep); // ****************************************************************** // ****************************************************************** // 添加hop把两个步骤关联起来 transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep)); System.out.println("***********the end************"); return transMeta; } } 4 文本文件输入 kafka输出 javascript处理

直接贴代码 拿回家就可以用

/* * @Author: wangran * @Date: 2020-06-09 16:11:58 * @LastEditors: wangran * @LastEditTime: 2020-06-10 14:20:22 */ package com.auto_generat.kettle.generat; import java.io.File; import org.apache.commons.io.FileUtils; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.plugins.PluginFolder; import org.pentaho.di.core.plugins.PluginRegistry; import org.pentaho.di.core.plugins.StepPluginType; import org.pentaho.di.core.row.ValueMetaInterface; import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.kafka.producer.KafkaProducerMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.file.BaseFileField; import org.pentaho.di.trans.steps.fileinput.text.TextFileFilter; import org.pentaho.di.trans.steps.fileinput.text.TextFileInputMeta; import org.pentaho.di.trans.steps.scriptvalues_mod.ScriptValuesMetaMod; import org.pentaho.di.trans.steps.scriptvalues_mod.ScriptValuesScript; public class FileToKafkaTrans { public static FileToKafkaTrans fileToKafkaTrans; public static void main(String[] args) { try { StepPluginType.getInstance().getPluginFolders() .add(new PluginFolder( "E:\\pdi-ce-8.2.0.0-342\\data-integration\\plugins\\steps\\pentaho-kafka-producer", false, true)); KettleEnvironment.init(); fileToKafkaTrans = new FileToKafkaTrans(); TransMeta transMeta = fileToKafkaTrans.generateMyOwnTrans(); String transXml = " \n" + transMeta.getXML(); // 拼接ktr文件的文件头 String transName = "C:\\deskTopFolder\\generate_file_kafka.ktr"; File file = new File(transName); FileUtils.writeStringToFile(file, transXml, "UTF-8"); } catch (Exception e) { e.printStackTrace(); return; } } /** * 生成一个转化,从txt文件中获取数据, 经过javascript处理, 再将整条数据存入kafka中 * * @return * @throws KettleXMLException */ public TransMeta generateMyOwnTrans() throws KettleXMLException { System.out.println("************start to generate my own transformation***********"); TransMeta transMeta = new TransMeta(); transMeta.setName("file_kafka"); // registry是给每个步骤生成一个标识Id用 PluginRegistry registry = PluginRegistry.getInstance(); // 文本输入 TextFileInputMeta textInput = new TextFileInputMeta();// 定义Meta String textInputPluginId = registry.getPluginId(StepPluginType.class, textInput);// 生成id textInput.setDefault();// 配置默认属性 String transPath = "E:\\pdi-ce-8.2.0.0-342\\data-integration\\export_file\\hello.txt"; textInput.setFileName(new String[] { transPath }); textInput.setFilter(new TextFileFilter[0]); // 过滤器 textInput.content.fileFormat = "unix"; // 文件格式 textInput.content.fileType = "CSV"; // 文件类型 BaseFileField field = new BaseFileField(); field.setName("pwd"); // 单条数据的键 field.setTrimType(ValueMetaInterface.TRIM_TYPE_BOTH); // 去除空格方式 field.setType("String");// 字段类型 textInput.inputFields = new BaseFileField[] { field }; // 设置列赋值 StepMeta TextInputMetaStep = new StepMeta(textInputPluginId, "text input", textInput);// 生成StepMeta TextInputMetaStep.setDraw(true);// 显示该控件 TextInputMetaStep.setLocation(400, 300);// 显示位置, xy坐标 transMeta.addStep(TextInputMetaStep); // js脚本配置 ScriptValuesScript jsScript = new ScriptValuesScript(0, "Script0", "var result = pwd+\"hello\";");// 创建js脚本代码 ScriptValuesMetaMod scriptValuesModMeta = new ScriptValuesMetaMod(); // 定义meta String scriptValuesModPluginId = registry.getPluginId(StepPluginType.class, scriptValuesModMeta); scriptValuesModMeta.setDefault(); scriptValuesModMeta.setJSScripts(new ScriptValuesScript[] { jsScript });// 设置脚本 scriptValuesModMeta.setFieldname(new String[] { "result" });// 输出列 scriptValuesModMeta.setRename(new String[] { "result" });// 输出列别名 /** * 字段类型: 0为None, 1为Number, 2为String, 3为Date, 4为Boolean, 5为Integer, 6为BinNumber, * 7为Serializable, 8为Binary, 9为Timestamp, 10为Internet Address */ scriptValuesModMeta.setType(new int[] { 2 });// 根据上述配置字段类型 scriptValuesModMeta.setLength(new int[] { -1 });// 长度, -1表示不进行设置 scriptValuesModMeta.setPrecision(new int[] { -1 });// 精度, -1表示不进行设置 scriptValuesModMeta.setReplace(new boolean[] { false });// 是否替换 "Fieldname"或"Rename to"值 StepMeta javaScriptMetaStep = new StepMeta(scriptValuesModPluginId, "script option", scriptValuesModMeta); javaScriptMetaStep.setDraw(true); javaScriptMetaStep.setLocation(600, 300); transMeta.addStep(javaScriptMetaStep); // kafka 插件 KafkaProducerMeta kafkaProducer = new KafkaProducerMeta(); String KafkaProducerPluginId = registry.getPluginId(StepPluginType.class, kafkaProducer); kafkaProducer.setDefault();// 默认设置 kafkaProducer.setMessageField("pwd");// 获取数据来源的字段 kafkaProducer.setTopic("logstash_json");// topic kafkaProducer.getKafkaProperties().put("metadata.broker.list", "192.168.5.39:9092");// 对值进行引用传递设置kafka的ip:port StepMeta KafkaProducerMetaStep = new StepMeta(KafkaProducerPluginId, "kafka output", kafkaProducer); KafkaProducerMetaStep.setDraw(true); KafkaProducerMetaStep.setLocation(800, 300); transMeta.addStep(KafkaProducerMetaStep); // 添加hop把步骤关联起来, 相当于连线 transMeta.addTransHop(new TransHopMeta(TextInputMetaStep, javaScriptMetaStep)); transMeta.addTransHop(new TransHopMeta(javaScriptMetaStep, KafkaProducerMetaStep)); System.out.println("***********the end************"); return transMeta; } } 5 总结

这几天一直在搞这个东西, 但是奈何国内的网站真的是没有相关的文章啊, 难道是研究这些东西的人, 都那么吝惜自己的文笔? 不愿意分享出来? 反正我是被逼急了, 跳墙出去找这些东西的源码, 最后才实现了这个需求 等会…急了??? 跳墙??? 好像哪里不太对…

6 灵感来源 6.1 TextFileInputMeta

文本文件输入的配置, 是在这找到的, 然后自己略加调整 在这里插入图片描述

6.2 KafkaProducerMeta

kafka生产者的灵感来源于这里 网址:https://www.programcreek.com/java-api-examples/?code=RuckusWirelessIL/pentaho-kafka-producer/pentaho-kafka-producer-master/src/main/java/org/pentaho/di/trans/kafka/producer/KafkaProducerMeta.java

package org.pentaho.di.trans.kafka.producer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.pentaho.di.core.CheckResult; import org.pentaho.di.core.CheckResultInterface; import org.pentaho.di.core.Const; import org.pentaho.di.core.Counter; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.xml.XMLHandler; import org.pentaho.di.repository.ObjectId; import org.pentaho.di.repository.Repository; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.BaseStepMeta; import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepInterface; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMetaInterface; import org.w3c.dom.Node; import kafka.producer.ProducerConfig; import org.eclipse.swt.widgets.Shell; import org.pentaho.di.core.annotations.Step; import org.pentaho.di.trans.step.StepDialogInterface; /** * Kafka Producer step definitions and serializer to/from XML and to/from Kettle * repository. * * @author Michael Spector */ @Step( id = "KafkaProducer", image = "org/pentaho/di/trans/kafka/producer/resources/kafka_producer.png", i18nPackageName="org.pentaho.di.trans.kafka.producer", name="KafkaProducerDialog.Shell.Title", description = "KafkaProducerDialog.Shell.Tooltip", categoryDescription="i18n:org.pentaho.di.trans.step:BaseStep.Category.Output") public class KafkaProducerMeta extends BaseStepMeta implements StepMetaInterface { public static final String[] KAFKA_PROPERTIES_NAMES = new String[] { "metadata.broker.list", "request.required.acks", "producer.type", "serializer.class", "request.timeout.ms", "key.serializer.class", "partitioner.class", "compression.codec", "compressed.topics", "message.send.max.retries", "retry.backoff.ms", "topic.metadata.refresh.interval.ms", "queue.buffering.max.ms", "queue.buffering.max.messages", "queue.enqueue.timeout.ms", "batch.num.messages", "send.buffer.bytes", "client.id" }; public static final Map KAFKA_PROPERTIES_DEFAULTS = new HashMap(); static { KAFKA_PROPERTIES_DEFAULTS.put("metadata.broker.list", "localhost:9092"); KAFKA_PROPERTIES_DEFAULTS.put("request.required.acks", "1"); KAFKA_PROPERTIES_DEFAULTS.put("producer.type", "sync"); KAFKA_PROPERTIES_DEFAULTS.put("serializer.class", "kafka.serializer.DefaultEncoder"); } private Properties kafkaProperties = new Properties(); private String topic; private String messageField; private String keyField; public Properties getKafkaProperties() { return kafkaProperties; } /** * @return Kafka topic name */ public String getTopic() { return topic; } /** * @param topic * Kafka topic name */ public void setTopic(String topic) { this.topic = topic; } /** * @return Target key field name in Kettle stream */ public String getKeyField() { return keyField; } /** * @param field * Target key field name in Kettle stream */ public void setKeyField(String field) { this.keyField = field; } /** * @return Target message field name in Kettle stream */ public String getMessageField() { return messageField; } /** * @param field * Target message field name in Kettle stream */ public void setMessageField(String field) { this.messageField = field; } public void check(List remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev, String input[], String output[], RowMetaInterface info) { if (isEmpty(topic)) { remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, Messages.getString("KafkaProducerMeta.Check.InvalidTopic"), stepMeta)); } if (isEmpty(messageField)) { remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, Messages.getString("KafkaProducerMeta.Check.InvalidMessageField"), stepMeta)); } try { new ProducerConfig(kafkaProperties); } catch (IllegalArgumentException e) { remarks.add(new CheckResult(CheckResultInterface.TYPE_RESULT_ERROR, e.getMessage(), stepMeta)); } } public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta transMeta, Trans trans) { return new KafkaProducer(stepMeta, stepDataInterface, cnr, transMeta, trans); } public StepDataInterface getStepData() { return new KafkaProducerData(); } public void loadXML(Node stepnode, List databases, Map counters) throws KettleXMLException { try { topic = XMLHandler.getTagValue(stepnode, "TOPIC"); messageField = XMLHandler.getTagValue(stepnode, "FIELD"); keyField = XMLHandler.getTagValue(stepnode, "KEYFIELD"); Node kafkaNode = XMLHandler.getSubNode(stepnode, "KAFKA"); String[] kafkaElements = XMLHandler.getNodeElements(kafkaNode); if (kafkaElements != null) { for (String propName : kafkaElements) { String value = XMLHandler.getTagValue(kafkaNode, propName); if (value != null) { kafkaProperties.put(propName, value); } } } } catch (Exception e) { throw new KettleXMLException(Messages.getString("KafkaProducerMeta.Exception.loadXml"), e); } } public String getXML() throws KettleException { StringBuilder retval = new StringBuilder(); if (topic != null) { retval.append(" ").append(XMLHandler.addTagValue("TOPIC", topic)); } if (messageField != null) { retval.append(" ").append(XMLHandler.addTagValue("FIELD", messageField)); } if (keyField != null) { retval.append(" ").append(XMLHandler.addTagValue("KEYFIELD", keyField)); } retval.append(" ").append(XMLHandler.openTag("KAFKA")).append(Const.CR); for (String name : kafkaProperties.stringPropertyNames()) { String value = kafkaProperties.getProperty(name); if (value != null) { retval.append(" " + XMLHandler.addTagValue(name, value)); } } retval.append(" ").append(XMLHandler.closeTag("KAFKA")).append(Const.CR); return retval.toString(); } public void readRep(Repository rep, ObjectId stepId, List databases, Map counters) throws KettleException { try { topic = rep.getStepAttributeString(stepId, "TOPIC"); messageField = rep.getStepAttributeString(stepId, "FIELD"); keyField = rep.getStepAttributeString(stepId, "KEYFIELD"); String kafkaPropsXML = rep.getStepAttributeString(stepId, "KAFKA"); if (kafkaPropsXML != null) { kafkaProperties.loadFromXML(new ByteArrayInputStream(kafkaPropsXML.getBytes())); } // Support old versions: for (String name : KAFKA_PROPERTIES_NAMES) { String value = rep.getStepAttributeString(stepId, name); if (value != null) { kafkaProperties.put(name, value); } } } catch (Exception e) { throw new KettleException("KafkaProducerMeta.Exception.loadRep", e); } } public void saveRep(Repository rep, ObjectId transformationId, ObjectId stepId) throws KettleException { try { if (topic != null) { rep.saveStepAttribute(transformationId, stepId, "TOPIC", topic); } if (messageField != null) { rep.saveStepAttribute(transformationId, stepId, "FIELD", messageField); } if (keyField != null) { rep.saveStepAttribute(transformationId, stepId, "KEYFIELD", keyField); } ByteArrayOutputStream buf = new ByteArrayOutputStream(); kafkaProperties.storeToXML(buf, null); rep.saveStepAttribute(transformationId, stepId, "KAFKA", buf.toString()); } catch (Exception e) { throw new KettleException("KafkaProducerMeta.Exception.saveRep", e); } } public void setDefault() { } public static boolean isEmpty(String str) { return str == null || str.length() == 0; } }

这是看到的源代码, 跟编辑器中Ctrl点击去的代码不太一样, 在这里有用的数据就是这个KAFKA_PROPERTIES_DEFAULTS 这里面配置了一些默认设置, 我们需要修改metadata.broker.list这个属性, 修改为自己的ip 但是在这里找不到这个属性的set方法, 没办法set值 考验一个java程序猿的基本功的时候到了, 看看我在代码中是如何设置的

KafkaProducerMeta kafkaProducer = new KafkaProducerMeta(); String KafkaProducerPluginId = registry.getPluginId(StepPluginType.class, kafkaProducer); kafkaProducer.setDefault();// 默认设置 kafkaProducer.setMessageField("pwd");// 获取数据来源的字段 kafkaProducer.setTopic("logstash_json");// topic kafkaProducer.getKafkaProperties().put("metadata.broker.list", "192.168.5.39:9092");// 对值进行引用传递设置kafka的ip:port StepMeta KafkaProducerMetaStep = new StepMeta(KafkaProducerPluginId, "kafka output", kafkaProducer); KafkaProducerMetaStep.setDraw(true); KafkaProducerMetaStep.setLocation(800, 300); transMeta.addStep(KafkaProducerMetaStep);

看到注释的东西, 我想你就已经明白了, 如果不明白的话, 我在这里做个解释 通过getKafkaProperties()方法, 可以获取到他的KafkaProperites文件, 在KafkaProducerMeta的源码中没有看到将KAFKA_PROPERTIES_DEFAULTS中的内设置到kafkaProperites中, 但是通过变量的名字也可以推断出这里他俩的关系不一般, 那么我们对kafkaProperties进行操作, 是不是KafkaProducerMeta在加载配置的时候, 就会以kafkaProperites为准呢?答案是肯定的 这里需要介绍一个知识点, 就是java的值传递和引用传递的问题, 我们在这里这样配置, 为什么就可以修改掉KafkaProducerMeta中的private属性呢?

// 我的配置 kafkaProducer.getKafkaProperties().put("metadata.broker.list", "192.168.5.39:9092"); // 源代码 private Properties kafkaProperties = new Properties(); private String topic; private String messageField; private String keyField; public Properties getKafkaProperties() { return kafkaProperties; }

这里getKafkaProperties()方法中返回的是属性kafkaProperties的值, 也就是private属性他的真身, 那么拿到自己的代码中 , kafkaProducer.getKafkaProperties()方法的结果也是kafkaProperties的真身, 如果这时用一个新的变量去接收它, 那么这个新的变量就是对真身的值的一个复制了, 就不再是真身了, 所以, 为什么通过方法传参数的形式修改参数的值, 不会影响原来的真身, 因为方法的参数, 都是新定义的一个变量嘛… 所以在这里获取到kafkaProperties属性真身之后, 再对其进行put值, 那不就是对KafkaProducerMeta的private属性真身进行操作了嘛… 你地? 明白??? 值传递和引用传递的事而已, java程序猿的基本功啊!!!

6.3 ScriptValuesMetaMod

这里注意啊, 这个类是比较特殊的, 你看前两个插件:文本文件和kafka生产者, 都是以Meta结尾的java文件, 这个却是把Meta放到了中间, 这可是把我坑够呛啊 想要自己通过源码找对应的类型, 就得自己通过kettle可视化工具找到所在分类 在这里插入图片描述 这个javascript是在脚本里, 那我就去找script文件夹呗…一顿操作猛如虎, 顺藤摸瓜一路找…就找到这了 在这里插入图片描述 你还别说, 到后来我发现他俩还真挺像的, 我用这个类一顿找啊, 然后各种配置, 最后运行代码也通过, 但是执行ktr文件的时候, 提示我有错误, 又是空指针, 又是数组下标越界啥的, 我就整了一个这样的结构 在这里插入图片描述 在kettle工具中拖拽出来一个javaScript代码, 连上线, 然后保存, 我去看一下ktr文件, 看看他俩有啥区别… 在这里插入图片描述 就在这个位置, 他俩的type是不一样的, 我用java生成的type是Script, 拖拽出来的type是ScriptValueMod, 这就让我有点不解了, 这是通过什么配置的吗? 无意间想起了在配置javascript代码时, 有这么一行代码

// js脚本配置 ScriptValuesScript jsScript = new ScriptValuesScript(0, "Script0", "var pwd = pwd+\"hello\";");// 创建js脚本代码

其中ScriptValuesScript是需要导包的, 这里让我选择一下是使用哪个包

org.pentaho.di.trans.steps.script org.pentaho.di.trans.steps.scriptvalues_mod

依稀记得见到过有mod字样的一个类, 然后我就想起来这个ktr文件中的这个ScriptValueMod类型, 是不是用错java类了? 然后又去jar包中重新找… 在这里插入图片描述 然后就在这里找到了这个scriptvalues_mod文件夹, 有点像啊, 进去试试吧… 还真的有…两个??? 在这里插入图片描述 我第一直觉是, 应该用下面的那个啊 , 长得多像啊, 肯定亲生的, 但是在实例化的时候, 让我传5个参数, 我都蒙了… 在这里插入图片描述

public ScriptValuesMod(org.pentaho.di.trans.step.StepMeta stepMeta, org.pentaho.di.trans.step.StepDataInterface stepDataInterface, int copyNr, org.pentaho.di.trans.TransMeta transMeta, org.pentaho.di.trans.Trans trans) { }

这都是啥啊? 咋传啊? 全传null??? 又回去看了一眼jar包中的class文件, 看到了那个ScriptValuesMetaMod.class, 抱着试一试的心态, 把它实例化了, 哎! 有无参构造方法, 看一下源代码…哎! 跟之前的ScriptMeta很像啊, 然后就开始配置, 再把js代码配置的引入包改成

import org.pentaho.di.trans.steps.scriptvalues_mod;

接下来就清清爽爽了…

finish… OK 我的故事讲完了, 又是没有bug的一天, 美滋滋 , 今天下班有点晚了… 在这里插入图片描述 回家 撤退!!!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3