Flink使用代码提交任务

您所在的位置:网站首页 flink提交任务 Flink使用代码提交任务

Flink使用代码提交任务

2023-01-08 20:07| 来源: 网络整理| 查看: 265

前言

本文Flink使用版本1.12.7

代码提交任务

准备文件夹和文件

123hadoop fs -mkdir -p /jar/userTaskhadoop fs -mkdir -p /jar/flink12/libdisthadoop fs -mkdir -p /jar/flink12/lib

拷贝需要的文件

123hadoop fs -put $FLINK_HOME/examples/batch/WordCount.jar /jar/userTask/WordCount.jarhadoop fs -put $FLINK_HOME/lib/flink-dist_2.12-1.12.7.jar /jar/flink12/libdist/flink-dist_2.12-1.12.7.jarhadoop fs -put $FLINK_HOME/lib/* /jar/flink12/lib/

查看文件可以访问这个地址

http://hadoop01:50070/explorer.html#/

http://hadoop02:50070/explorer.html#/

在服务器上测试一下

1flink run-application -t yarn-application hdfs://hacluster/jar/userTask/WordCount.jar --output hdfs://hacluster/bigdata_study/output03

添加依赖

12345 org.apache.flink flink-yarn_${scala.binary.version} ${flink.version}

代码

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124package cn.psvmc;import org.apache.flink.client.deployment.ClusterDeploymentException;import org.apache.flink.client.deployment.ClusterSpecification;import org.apache.flink.client.deployment.application.ApplicationConfiguration;import org.apache.flink.client.program.ClusterClient;import org.apache.flink.client.program.ClusterClientProvider;import org.apache.flink.configuration.*;import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;import org.apache.flink.yarn.YarnClusterDescriptor;import org.apache.flink.yarn.YarnClusterInformationRetriever;import org.apache.flink.yarn.configuration.YarnConfigOptions;import org.apache.flink.yarn.configuration.YarnDeploymentTarget;import org.apache.flink.yarn.configuration.YarnLogConfigUtil;import org.apache.hadoop.fs.Path;import org.apache.hadoop.yarn.api.records.ApplicationId;import org.apache.hadoop.yarn.client.api.YarnClient;import org.apache.hadoop.yarn.conf.YarnConfiguration;import java.util.Collections;import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;public class RunFlinkJob { public static void main(String[] args) { //flink的本地配置目录,为了得到flink的配置 // 如果出现org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.错误 // 则在flink-config.yaml加入 // classloader.resolve-order: parent-first String configurationDirectory = "/data/tools/bigdata/flink-1.12.7/conf"; //存放flink集群相关的jar包目录 String flinkLibs = "hdfs://hacluster/jar/flink12/lib"; //用户jar String userJarPath = "hdfs://hacluster/jar/userTask/WordCount.jar"; String flinkDistJar = "hdfs://hacluster/jar/flink12/libdist/flink-dist_2.12-1.12.7.jar"; YarnClient yarnClient = YarnClient.createYarnClient(); org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration(); entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/yarn-site.xml")); entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/hdfs-site.xml")); entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/core-site.xml")); YarnConfiguration yarnConfiguration = new YarnConfiguration(entries); yarnClient.init(yarnConfiguration); yarnClient.start(); // 设置日志的,没有的话看不到日志 YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever .create(yarnClient); //获取flink的配置 Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( configurationDirectory ); flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); flinkConfiguration.set( PipelineOptions.JARS, Collections.singletonList(userJarPath) ); Path remoteLib = new Path(flinkLibs); flinkConfiguration.set( YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(remoteLib.toString()) );// flinkConfiguration.set(// YarnConfigOptions.FLINK_DIST_JAR,// flinkDistJar// ); // 设置为APPLICATION模式 flinkConfiguration.set( DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName() ); // yarn application name flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "zApplication");// flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));// flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES)); YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory); ClusterSpecification clusterSpecification = new ClusterSpecification .ClusterSpecificationBuilder() .createClusterSpecification(); // 设置用户jar的参数和主类// ApplicationConfiguration appConfig = new ApplicationConfiguration(args, "org.apache.flink.examples.java.wordcount.WordCount"); ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null); YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor( flinkConfiguration, yarnConfiguration, yarnClient, clusterInformationRetriever, true ); try { ClusterClientProvider clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster( clusterSpecification, appConfig ); ClusterClient clusterClient = clusterClientProvider.getClusterClient(); ApplicationId applicationId = clusterClient.getClusterId(); String webInterfaceURL = clusterClient.getWebInterfaceURL(); System.out.println("applicationId is {}" + applicationId); System.out.println("webInterfaceURL is {}" + webInterfaceURL); // 退出 // yarnClusterDescriptor.killCluster(applicationId); } catch (Exception e) { e.printStackTrace(); } }}

查看yarn

http://hadoop02:8088/cluster

调用脚本执行12345678910111213141516171819202122232425262728293031323334353637package cn.psvmc;import com.jcraft.jsch.ChannelExec;import com.jcraft.jsch.JSch;import com.jcraft.jsch.JSchException;import com.jcraft.jsch.Session;import com.sun.istack.logging.Logger;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class ConnectionSSH { private static final Logger logger = Logger.getLogger(ConnectionSSH.class); public static void main(String[] args) throws JSchException, IOException { JSch jsch = new JSch(); String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa"; jsch.addIdentity(pubKeyPath); String username = "root"; String host = "192.168.7.101"; Session session =jsch.getSession(username, host, 22);//为了连接做准备 session.setConfig("StrictHostKeyChecking", "no"); session.connect(); String command = "flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar"; ChannelExec channel=(ChannelExec)session.openChannel("exec"); channel.setCommand(command); BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream())); channel.connect(); String msg; while((msg = in.readLine()) != null){ System.out.println(msg); } channel.disconnect(); session.disconnect(); }}

使用密码

1234567JSch jsch = new JSch();String username = "root";String host = "192.168.7.101";Session session =jsch.getSession(username, host, 22);//为了连接做准备session.setConfig("StrictHostKeyChecking", "no");session.setPassword("zhangjian");session.connect();

使用密匙

12345678JSch jsch = new JSch();String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";jsch.addIdentity(pubKeyPath);String username = "root";String host = "192.168.7.101";Session session =jsch.getSession(username, host, 22);//为了连接做准备session.setConfig("StrictHostKeyChecking", "no");session.connect(); 调用脚本执行2

这个类除了可以运行脚本,还可以复制文件。

依赖:12345 ch.ethz.ganymed ganymed-ssh2 build210

工具类

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274package cn.psvmc;import ch.ethz.ssh2.Connection;import ch.ethz.ssh2.SCPClient;import ch.ethz.ssh2.Session;import ch.ethz.ssh2.StreamGobbler;import org.apache.commons.lang3.StringUtils;import org.apache.log4j.Logger;import java.io.*;/** * 描述:连接linux服务器并执行相关的shell命令 */public class ConnectLinuxCommand { private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class); private static final String DEFAULTCHARTSET = "UTF-8"; private static Connection conn; /** * @Title: login * @Description: 用户名密码方式 远程登录linux服务器 * @return: Boolean */ public static Boolean login(RemoteConnect remoteConnect) { boolean flag = false; try { conn = new Connection(remoteConnect.getIp()); conn.connect();// 连接 flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// 认证 if (flag) { logger.info("认证成功!"); } else { logger.error("认证失败!"); conn.close(); } } catch (IOException e) { e.printStackTrace(); } return flag; } public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) { boolean flag = true; try { conn = new Connection(remoteConnect.getIp()); conn.connect();// 连接 boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess(); System.out.println("authenticationPartialSuccess = " + authenticationPartialSuccess); logger.info("认证成功!"); } catch (IOException e) { e.printStackTrace(); } return flag; } /** * @param remoteConnect 连接信息对象 * @param keyFile 一个文件对象指向一个文件,该文件包含OpenSSH**格式的用户的DSA或RSA私钥(PEM,不能丢失"-----BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE KEY-----"标签 * @param keyfilePass 如果秘钥文件加密 需要用该参数解密,如果没有加密可以为null * @return Boolean * @Title: loginByKey * @Description: 秘钥方式 远程登录linux服务器 */ public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) { boolean flag = false; try { conn = new Connection(remoteConnect.getIp()); conn.connect(); // 登录认证 flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass); if (flag) { logger.info("认证成功!"); } else { logger.error("认证失败!"); conn.close(); } } catch (Exception e) { e.printStackTrace(); } return flag; } /** * @param remoteConnect 连接信息对象 * @param keys 一个字符[],其中包含用户的DSA或RSA私钥(OpenSSH密匙格式,您不能丢失“----- begin DSA私钥-----”或“-----BEGIN RSA PRIVATE KEY-----“标签。char数组可以包含换行符/换行符。 * @param keyPass 如果秘钥字符数组加密 需要用该字段解密 否则不需要可以为null * @return Boolean * @Title: loginByCharsKey * @Description: 秘钥方式 远程登录linux服务器 */ public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) { boolean flag = false; try { conn = new Connection(remoteConnect.getIp()); conn.connect(); // 登录认证 flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass); if (flag) { logger.info("认证成功!"); } else { logger.error("认证失败!"); conn.close(); } } catch (Exception e) { e.printStackTrace(); } return flag; } /** * @param cmd 脚本命令 * @Title: execute * @Description: 远程执行shll脚本或者命令 * @return: result 命令执行完毕返回结果 */ public static String runCmd(String cmd) { String result = ""; try { Session session = conn.openSession();// 打开一个会话 session.execCommand(cmd);// 执行命令 result = processStdout(session.getStdout(), DEFAULTCHARTSET); // 如果为得到标准输出为空,说明脚本执行出错了 if (StringUtils.isBlank(result)) { result = processStdout(session.getStderr(), DEFAULTCHARTSET); } conn.close(); session.close(); } catch (IOException e) { e.printStackTrace(); } return result; } /** * @return String 命令执行成功后返回的结果值,如果命令执行失败,返回空字符串,不是null * @Title: executeSuccess * @Description: 远程执行shell脚本或者命令 */ public static String runCmdSuccess(String cmd) { String result = ""; try { Session session = conn.openSession();// 打开一个会话 session.execCommand(cmd);// 执行命令 result = processStdout(session.getStdout(), DEFAULTCHARTSET); conn.close(); session.close(); } catch (IOException e) { e.printStackTrace(); } return result; } /** * @param in 输入流对象 * @param charset 编码 * @return String 以纯文本的格式返回 * @Title: processStdout * @Description: 解析脚本执行的返回结果 */ public static String processStdout(InputStream in, String charset) { InputStream stdout = new StreamGobbler(in); StringBuilder buffer = new StringBuilder(); try { BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset)); String line = null; while ((line = br.readLine()) != null) { buffer.append(line).append("\n"); } } catch (IOException e) { e.printStackTrace(); } return buffer.toString(); } /** * @return String * @Description: 通过用户名和密码关联linux服务器 */ public static String runCmd(String ip, String userName, String password, String commandStr) { logger.info( "ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " commandStr:" + commandStr ); String returnStr = ""; RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); remoteConnect.setPassword(password); try { if (login(remoteConnect)) { returnStr = runCmd(commandStr); System.out.println(returnStr); } } catch (Exception e) { e.printStackTrace(); } return returnStr; } public static boolean connectLinuxWithoutPwd(String ip, String userName, String commandStr) { logger.info("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " commandStr:" + commandStr); String returnStr = ""; boolean result = true; RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); try { if (loginWithoutPwd(remoteConnect)) { returnStr = runCmd(commandStr); System.out.println(result); } } catch (Exception e) { e.printStackTrace(); } if (StringUtils.isBlank(returnStr)) { result = false; } return result; } /** * @param password 密码(其他服务器) * @param remoteFile 文件位置(其他服务器) * @param localDir 本服务器目录 * @Title: scpGet * @Description: 从其他服务器获取文件到本服务器指定目录 */ public static void scpPull(String ip, String userName, String password, String remoteFile, String localDir) throws IOException { logger.info("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " remoteFile:" + remoteFile + " localDir:" + localDir); RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); remoteConnect.setPassword(password); if (login(remoteConnect)) { SCPClient client = new SCPClient(conn); client.get(remoteFile, localDir); conn.close(); } } /** * 将文件复制到其他计算机中 * @param ip 远程IP * @param userName 远程用户名 * @param password 远程密码 * @param localFile 本地文件 * @param remoteDir 远程目录 * @throws IOException 异常 */ public static void scpPush(String ip, String userName, String password, String localFile, String remoteDir) throws IOException { logger.info("ConnectLinuxCommand scpPut===" + "ip:" + ip + " userName:" + userName + " localFile:" + localFile + " remoteDir:" + remoteDir); RemoteConnect remoteConnect = new RemoteConnect(); remoteConnect.setIp(ip); remoteConnect.setUserName(userName); remoteConnect.setPassword(password); if (login(remoteConnect)) { SCPClient client = new SCPClient(conn); client.put(localFile, remoteDir); conn.close(); } }}

RemoteConnect

1234567891011121314151617181920212223242526272829public class RemoteConnect { String ip; String userName; String password; public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; }}

测试

12345678910111213141516171819202122232425262728293031package cn.psvmc;public class CLCTest { public static void main(String[] args) { mTest1(); } public static void mTest1() { System.out.println("--------------------------------------"); String commandStr="flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar"; String result=ConnectLinuxCommand.runCmd("192.168.7.101","root","zhangjian",commandStr); System.out.println("结果:"+result); System.out.println("--------------------------------------"); } public static void mTest2() { try { ConnectLinuxCommand.scpPull("192.168.7.101","root","zhangjian", "/root/test.txt", "d:/aa"); } catch (Exception e) { e.printStackTrace(); } } public static void mTest3() { try { ConnectLinuxCommand.scpPush("192.168.7.101","root","zhangjian", "d:/aa/test2.txt", "/root/"); } catch (Exception e) { e.printStackTrace(); } }}


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3