spark on yarn yarn

您所在的位置:网站首页 spark版本区别 spark on yarn yarn

spark on yarn yarn

2023-03-07 10:17| 来源: 网络整理| 查看: 265

Spark版本2.4.0

 

 

在SparkContext的初始化过程中,将会根据配置的启动模式来选择不同的任务调度器TaskScheduler,而这个不同模式的实现也是在这里根据选择的TaskScheduler类型进行区分并实现。

case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) }

上方式SparkContext的createTaskScheduler()方法,在这里当选择了yarn模式,将会在这里加载相应的ClusterManager来进行创建TaskScheduler,在标题所提到的yarn-client模式下,这里会分别创建一个YarnScheduler和YarnClinetSchedulerBackend作为spark任务运行的调度者。

 

YarnScheduler实现只是简单的继承了local模型下会选择的TaskSchedulerImpl,因为在yarn-client模式下和local一样,Driver端运行在本地,所以YarnScheduler的实现并没有什么特殊的地方。

但是相应的,由于backend实现了和yarn的交互,自然实现存在比较大的差异。

 

当TaskScheduler正式开始启动的时候,在YarnClinetSchedulerBackend的start()方法中,也会开始初始化一个yarn客户端,并在这里完成向yarn的ResourceManager注册提交应用的流程。

override def start() { val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } val argsArrayBuf = new ArrayBuffer[String]() argsArrayBuf += ("--arg", hostport) logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray) totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) client = new Client(args, conf) bindToYarn(client.submitApplication(), None) // SPARK-8687: Ensure all necessary properties have already been set before // we initialize our driver scheduler backend, which serves these properties // to the executors super.start() waitForApplication() monitorThread = asyncMonitorApplication() monitorThread.start() }

上方是YarnClinetSchedulerBackend的start()方法,可以看到在这里核心两个步骤,构建Client,Client封装了与yarn的连接与操作,而后便是通过初始化完毕的Client通过submitApplication()方法提交应用。

 

重点来看Client的submitApplication()方法。

yarnClient.init(hadoopConf) yarnClient.start()

首先根据工程中的配置完成yarnClient的初始化,之后相关操作都是通过yarnClient来进行完成。

val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId()

之后先通过createApplication()方法向yarn申请一个新的Application,在这里得到的newAppResponse不仅包含yarn的相关配置部署信息及限制,更重要的是在这里返回了所申请应用在接下来的appId,在yarn模式下appid是yarn所提供的。

val containerContext = createContainerLaunchContext(newAppResponse)

接下来是重要的一步,根据createContainerLauchContext()方法来构建yarn中的重要属性Container的上下文containerContext。

 

 

对应在yarn中构建Container中所需的相关重要属性,都会在createContainerLauchContext()方法中得到。

val appId = newAppResponse.getApplicationId val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))

在这里,根据配置的hdfs属性,使用用户,以及刚刚得到的appid创建了之后相关jar包和资源将会上传的hdfs路径。

val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources.asJava) amContainer.setEnvironment(launchEnv.asJava)

相应的得到了这个路径,将会在这里准备将将要上传至hdfs的本地资源准备上传到hdfs对应的路径上去。

val javaOpts = ListBuffer[String]() // Set the environment variable through a command prefix // to append to the existing value of the variable var prefixEnv: Option[String] = None // Add Xmx for AM memory javaOpts += "-Xmx" + amMemory + "m" val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) javaOpts += "-Djava.io.tmpdir=" + tmpDir // TODO: Remove once cpuset version is pushed out. // The context is, default gc for server class machines ends up using all cores to do gc - // hence if there are multiple containers in same node, Spark GC affects all other containers' // performance (which can be that of other Spark containers) // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset // of cores on a node. val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean) if (useConcurrentAndIncrementalGC) { // In our expts, using (default) throughput collector has severe perf ramifications in // multi-tenant machines javaOpts += "-XX:+UseConcMarkSweepGC" javaOpts += "-XX:MaxTenuringThreshold=31" javaOpts += "-XX:SurvivorRatio=8" javaOpts += "-XX:+CMSIncrementalMode" javaOpts += "-XX:+CMSIncrementalPacing" javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" javaOpts += "-XX:CMSIncrementalDutyCycle=10" }

显然上方一部分是在yarn上将要启动的一部分java命令行参数的构建,该部分代码只是对应功能的一部分实现,该部分涉及到的参数很多,代码也很长。

val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName }

值得一提的是,yarn-client模式将要提交给yarn实现的ApplicationMaster将是ExecutorLauncher。

上述提到的都将作为一部分供在yarn上进行任务创建的时候使用。

val appContext = createApplicationSubmissionContext(newApp, containerContext)

在完成containerContext的创建,将会通过createApplicationSubmissionContext()方法创建appContext,这个app上下文将会直接被用在向yan提交app上。

yarnClient.submitApplication(appContext)

createApplicationSubmissionContext()方法中,进一步根据yarn的要求进行提交app的封装,之前提到的containerContext也会作为一部分被封装,最后通过yarnClient提交app宣告app的提交完毕。

到这里,yarn-client向yarn的ResourceManager提交ApplicationMaster的步骤完成。

 

提交到yarn上后,首先会在一个NodeManager上启动一个ExecutorLauncher来与先前的spark端进行通信,由于是yarn-client模式,将根据运行在本地的Driver端的调度来在yarn中进行task的创建。

object ExecutorLauncher { def main(args: Array[String]): Unit = { ApplicationMaster.main(args) } }

ExecutorLauncher的实现其实还是和yarn-cluster一样通过ApplicationMaster实现,但是将会在ApplicationMaster具体的实现逻辑中进行相应的区分。

在yarn-client模式中,ApplicationMaster的主要逻辑实现在了runExecutorLauncher()方法中。

val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0)) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(driverHost, driverPort), YarnSchedulerBackend.ENDPOINT_NAME) addAmIpFilter(Some(driverRef)) createAllocator(driverRef, sparkConf)

在runExecutorLauncher()方法中,首先会直接构造与Driver端的通信连接,并构造一个yarnAllocator准备通过和yarn申请资源来执行task。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: RequestExecutors => Option(allocator) match { case Some(a) => if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal, r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) { resetAllocatorInterval() } context.reply(true) case None => logWarning("Container allocator is not ready to request executors yet.") context.reply(false) } case KillExecutors(executorIds) => logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") Option(allocator) match { case Some(a) => executorIds.foreach(a.killExecutor) case None => logWarning("Container allocator is not ready to kill executors yet.") } context.reply(true) case GetExecutorLossReason(eid) => Option(allocator) match { case Some(a) => a.enqueueGetLossReasonRequest(eid, context) resetAllocatorInterval() case None => logWarning("Container allocator is not ready to find executor loss reasons yet.") } }

在于Driver端的通信中,将会持续监听Driver端的task下发,并根据此向yarn申请资源执行task。

override def onDisconnected(remoteAddress: RpcAddress): Unit = { // In cluster mode, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails if (!isClusterMode) { logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } }

Yarn-client模式下本地Driver关闭会导致整个应用关闭也在此实现,当与Driver端的连接关闭的时候,将会结束在yarn上的运行。

 

 

最后回到Driver端,上文YarnClinetSchedulerBackend继承自YarnSchedulerBackend,当任务在调度执行环节时,将task下发至yarn上的ApplicationMaster,便是在YarnSchedulerBackend中实现的。

/** * Request executors from the ApplicationMaster by specifying the total number desired. * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) } /** * Request that the ApplicationMaster kill the specified executors. */ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds)) }

最后executor的下发都在这里通过网络通信下发到yarn上的ApplicationMaster,来进行远程调度。

 

以上便是spark on yarn中yarn-client的源码走读。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3