如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)

您所在的位置:网站首页 在idea中使用spark处理数据超过80条就报错 如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)

如何在IDEA上编写Spark程序?(本地+集群+java三种模式书写代码)

2024-05-14 21:21| 来源: 网络整理| 查看: 265

本篇博客,Alice为大家带来关于如何在IDEA上编写Spark程序的教程。

在这里插入图片描述在这里插入图片描述写在前面

本次讲解我会通过一个非常经典的案例,同时也是在学MapReduce入门时少不了的一个例子——WordCount 来完成不同场景下Spark程序代码的书写。大家可以在敲代码时可以思考这样一个问题,用Spark是不是真的比MapReduce简便?

准备材料

wordcount.txt

代码语言:javascript复制hello me you her hello you her hello her hello图解WordCount在这里插入图片描述在这里插入图片描述pom.xml创建Maven项目并补全目录、配置pom.xml代码语言:javascript复制 4.0.0 com.czxy spark_demo 1.0-SNAPSHOT aliyun http://maven.aliyun.com/nexus/content/groups/public/ cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ jboss http://repository.jboss.com/nexus/content/groups/public 1.8 1.8 UTF-8 2.11.8 2.11 2.7.4 2.2.0 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2.11 ${spark.version} org.apache.spark spark-hive_2.11 ${spark.version} org.apache.spark spark-hive-thriftserver_2.11 ${spark.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-10_2.11 ${spark.version} org.apache.spark spark-sql-kafka-0-10_2.11 ${spark.version} org.apache.hadoop hadoop-client 2.7.4 org.apache.hbase hbase-client 1.3.1 org.apache.hbase hbase-server 1.3.1 com.typesafe config 1.3.3 mysql mysql-connector-java 5.1.38 src/main/java src/test/scala org.apache.maven.plugins maven-compiler-plugin 3.5.1 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-surefire-plugin 2.18.1 false true **/*Test.* **/*Suite.* org.apache.maven.plugins maven-shade-plugin 2.3 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA maven-assembly-plugin和maven-shade-plugin的区别

可以参考这篇博客https://blog.csdn.net/lisheng19870305/article/details/88300951

本地执行代码语言:javascript复制package com.czxy.scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /* * @Auther: Alice菌 * @Date: 2020/2/19 08:39 * @Description: 流年笑掷 未来可期。以梦为马,不负韶华! */ /** * 本地运行 */ object Spark_wordcount { def main(args: Array[String]): Unit = { // 1.创建SparkContext var config = new SparkConf().setAppName("wc").setMaster("local[*]") val sc = new SparkContext(config) sc.setLogLevel("WARN") // 2.读取文件 // A Resilient Distributed Dataset (RDD)弹性分布式数据集 // 可以简单理解为分布式的集合,但是Spark对它做了很多的封装 // 让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了 val fileRDD: RDD[String] = sc.textFile("G:\\2020干货\\Spark\\wordcount.txt") // 3.处理数据 // 3.1 对每一行数据按空格切分并压平形成一个新的集合中 // flatMap是对集合中的每一个元素进行操作,再进行压平 val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" ")) // 3.2 每个单词记为1 val wordAndOneRDD: RDD[(String, Int)] = wordRDD.map((_,1)) // 3.3 根据key进行聚合,统计每个单词的数量 // wordAndOneRDD.reduceByKey((a,b)=>a+b) // 第一个_: 之前累加的结果 // 第二个_: 当前进来的数据 val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_) // 4. 收集结果 val result: Array[(String, Int)] = wordAndCount.collect() // 控制台打印结果 result.foreach(println) } }

运行的结果:

在这里插入图片描述在这里插入图片描述集群上运行代码语言:javascript复制package com.czxy.scala import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /* * @Auther: Alice菌 * @Date: 2020/2/19 09:12 * @Description: 流年笑掷 未来可期。以梦为马,不负韶华! */ /** * 集群运行 */ object Spark_wordcount_cluster { def main(args: Array[String]): Unit = { // 1. 创建SparkContext val config = new SparkConf().setAppName("wc") val sc = new SparkContext(config) sc.setLogLevel("WARN") // 2. 读取文件 // A Resilient Distributed Dataset (RDD) 弹性分布式数据集 // 可以简单理解为分布式的集合,但是spark对它做了很多的封装 // 让程序员使用起来就像操作本地集合一样简单,这样大家就很happy了 val fileRDD: RDD[String] = sc.textFile(args(0)) // 文件输入路径 // 3. 处理数据 // 3.1对每一行数据按照空格进行切分并压平形成一个新的集合 // flatMap是对集合中的每一个元素进行操作,再进行压平 val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" ")) // 3.2 每个单词记为1 val wordAndOneRDD = wordRDD.map((_,1)) // 3.3 根据key进行聚合,统计每个单词的数量 // wordAndOneRDD.reduceByKey((a,b)=>a+b) // 第一个_:之前累加的结果 // 第二个_:当前进来的数据 val wordAndCount: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_) wordAndCount.saveAsTextFile(args(1)) // 文件输出路径 } }打包在这里插入图片描述在这里插入图片描述上传 在这里插入图片描述在这里插入图片描述执行命令提交到Spark-HA集群代码语言:javascript复制/export/servers/spark/bin/spark-submit \ --class cn.itcast.sparkhello.WordCount \ --master spark://node01:7077,node02:7077 \ --executor-memory 1g \ --total-executor-cores 2 \ /root/wc.jar \ hdfs://node01:8020/wordcount/input/words.txt \ hdfs://node01:8020/wordcount/output4执行命令提交到YARN集群代码语言:javascript复制/export/servers/spark/bin/spark-submit \ --class cn.itcast.sparkhello.WordCount \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 2 \ --queue default \ /root/wc.jar \ hdfs://node01:8020/wordcount/input/words.txt \ hdfs://node01:8020/wordcount/output5

这里我们提交到YARN集群

在这里插入图片描述在这里插入图片描述

运行结束后在hue中查看结果

在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述Java8版[了解]

Spark是用Scala实现的,而scala作为基于JVM的语言,与Java有着良好集成关系。用Java语言来写前面的案例同样非常简单,只不过会有点冗长。

代码语言:javascript复制package com.czxy.scala; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; /** * @Auther: Alice菌 * @Date: 2020/2/21 09:48 * @Description: 流年笑掷 未来可期。以梦为马,不负韶华! */ public class Spark_wordcount_java8 { public static void main(String[] args){ SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[*]"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDD fileRDD = jsc.textFile("G:\\2020干货\\Spark\\wordcount.txt"); JavaRDD wordRDD = fileRDD.flatMap(s -> Arrays.asList(s.split(" ")).iterator()); JavaPairRDD wordAndOne = wordRDD.mapToPair(w -> new Tuple2(w, 1)); JavaPairRDD wordAndCount = wordAndOne.reduceByKey((a, b) -> a + b); //wordAndCount.collect().forEach(t->System.out.println(t)); wordAndCount.collect().forEach(System.out::println); //函数式编程的核心思想:行为参数化! } }

运行后的结果是一样的。

在这里插入图片描述在这里插入图片描述

本次的分享就到这里,受益的小伙伴或对大数据技术感兴趣的朋友记得点赞关注Alice哟(^U^)ノ~YO



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3