Apache Spark源码走读之7 — Standalone部署方式分析 – 徽沪一郎 – 博客园

在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解。

没有HA的Standalone运行模式

先从比较简单的说起,所谓的没有ha是指master节点没有ha。

组成cluster的两大元素即Master和Worker。slave worker可以有1到多个,这些worker都处于active状态。

Driver Application可以运行在Cluster之内,也可以在cluster之外运行,先从简单的讲起即Driver Application独立于Cluster。那么这样的整体框架如下图所示,由driver,master和多个slave worker来共同组成整个的运行环境。

执行顺序

步骤1 运行master

<code class="bash hljs "><span class="hljs-variable">$SPARK_HOME</span>/sbin/start_master.sh</p><p></code>

start_master.sh中最关键的一句就是

<code class="bash hljs "><span class="hljs-string">"<span class="hljs-variable">$sbin</span>"</span>/spark-daemon.sh start org.apache.spark.deploy.master.Master <span class="hljs-number">1</span> --ip <span class="hljs-variable">$SPARK_MASTER_IP</span> --port <span class="hljs-variable">$SPARK_MASTER_PORT</span> --webui-port <span class="hljs-variable">$SPARK_MASTER_WEBUI_PORT</span></p><p></code>

检测Master的jvm进程

<code class="bash hljs ">root     <span class="hljs-number">23438</span>     <span class="hljs-number">1</span> <span class="hljs-number">67</span> <span class="hljs-number">22</span>:<span class="hljs-number">57</span> pts/<span class="hljs-number">0</span>    <span class="hljs-number">00</span>:<span class="hljs-number">00</span>:<span class="hljs-number">05</span> /opt/java/bin/java -cp :/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/conf:/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/assembly/target/scala-<span class="hljs-number">2.10</span>/spark-assembly_2.<span class="hljs-number">10</span>-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-hadoop2.<span class="hljs-number">2.0</span>.jar -Dspark.akka.logLifecycleEvents=<span class="hljs-literal">true</span> -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port <span class="hljs-number">7077</span> --webui-port <span class="hljs-number">8080</span></p><p></code>

Master的日志在$SPARK_HOME/logs目录下

步骤2 运行worker,可以启动多个

<code class="bash hljs ">./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:<span class="hljs-number">7077</span></p><p></code>

worker运行时,需要注册到指定的master url,这里就是spark://localhost:7077.

Master侧收到RegisterWorker通知,其处理代码如下

<code class="scala hljs "><span class="hljs-keyword">case</span> RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =&gt;</p><p>    {</p><p>      logInfo(<span class="hljs-string">"Registering worker %s:%d with %d cores, %s RAM"</span>.format(</p><p>        workerHost, workerPort, cores, Utils.megabytesToString(memory)))</p><p>      <span class="hljs-keyword">if</span> (state == RecoveryState.STANDBY) {</p><p>        <span class="hljs-comment">// ignore, don't send response</span></p><p>      } <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (idToWorker.contains(id)) {</p><p>        sender ! RegisterWorkerFailed(<span class="hljs-string">"Duplicate worker ID"</span>)</p><p>      } <span class="hljs-keyword">else</span> {</p><p>        <span class="hljs-keyword">val</span> worker = <span class="hljs-keyword">new</span> WorkerInfo(id, workerHost, workerPort, cores, memory,</p><p>          sender, workerUiPort, publicAddress)</p><p>        <span class="hljs-keyword">if</span> (registerWorker(worker)) {</p><p>          persistenceEngine.addWorker(worker)</p><p>          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)</p><p>          schedule()</p><p>        } <span class="hljs-keyword">else</span> {</p><p>          <span class="hljs-keyword">val</span> workerAddress = worker.actor.path.address</p><p>          logWarning(<span class="hljs-string">"Worker registration failed. Attempted to re-register worker at same "</span> +</p><p>            <span class="hljs-string">"address: "</span> + workerAddress)</p><p>          sender ! RegisterWorkerFailed(<span class="hljs-string">"Attempted to re-register worker at same address: "</span></p><p>            + workerAddress)</p><p>        }</p><p>      }</p><p>    }</p><p></code>

步骤3 运行Spark-shell

<code class="bash hljs ">MASTER=spark://localhost:<span class="hljs-number">7077</span> <span class="hljs-variable">$SPARK_HOME</span>/bin/spark-shell</code></p><p>

spark-shell属于application,有关appliation的运行日志存储在$SPARK_HOME/works目录下

spark-shell作为application,在Master侧其处理的分支是RegisterApplication,具体处理代码如下。

<code class="scala hljs "><span class="hljs-keyword">case</span> RegisterApplication(description) =&gt; {</p><p>      <span class="hljs-keyword">if</span> (state == RecoveryState.STANDBY) {</p><p>        <span class="hljs-comment">// ignore, don't send response</span></p><p>      } <span class="hljs-keyword">else</span> {</p><p>        logInfo(<span class="hljs-string">"Registering app "</span> + description.name)</p><p>        <span class="hljs-keyword">val</span> app = createApplication(description, sender)</p><p>        registerApplication(app)</p><p>        logInfo(<span class="hljs-string">"Registered app "</span> + description.name + <span class="hljs-string">" with ID "</span> + app.id)</p><p>        persistenceEngine.addApplication(app)</p><p>        sender ! RegisteredApplication(app.id, masterUrl)</p><p>        schedule()</p><p>      }</p><p>    }</p><p></code>

每当有新的application注册到master,master都要调度schedule函数将application发送到相应的worker,在对应的worker启动相应的ExecutorBackend. 具体代码请参考Master.scala中的schedule函数,代码就不再列出。

步骤4 结果检测

<code class="bash hljs ">/opt/java/bin/java -cp :/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/conf:/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/assembly/target/scala-<span class="hljs-number">2.10</span>/spark-assembly_2.<span class="hljs-number">10</span>-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-hadoop2.<span class="hljs-number">2.0</span>.jar -Dspark.akka.logLifecycleEvents=<span class="hljs-literal">true</span> -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port <span class="hljs-number">7077</span> --webui-port <span class="hljs-number">8080</span></p><p>root     <span class="hljs-number">23752</span> <span class="hljs-number">23745</span> <span class="hljs-number">21</span> <span class="hljs-number">23</span>:<span class="hljs-number">00</span> pts/<span class="hljs-number">0</span>    <span class="hljs-number">00</span>:<span class="hljs-number">00</span>:<span class="hljs-number">25</span> /opt/java/bin/java -cp :/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/conf:/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/assembly/target/scala-<span class="hljs-number">2.10</span>/spark-assembly_2.<span class="hljs-number">10</span>-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-hadoop2.<span class="hljs-number">2.0</span>.jar -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.repl.Main</p><p>root     <span class="hljs-number">23986</span> <span class="hljs-number">23938</span> <span class="hljs-number">25</span> <span class="hljs-number">23</span>:<span class="hljs-number">02</span> pts/<span class="hljs-number">2</span>    <span class="hljs-number">00</span>:<span class="hljs-number">00</span>:<span class="hljs-number">03</span> /opt/java/bin/java -cp :/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/conf:/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/assembly/target/scala-<span class="hljs-number">2.10</span>/spark-assembly_2.<span class="hljs-number">10</span>-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-hadoop2.<span class="hljs-number">2.0</span>.jar -Dspark.akka.logLifecycleEvents=<span class="hljs-literal">true</span> -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:<span class="hljs-number">7077</span></p><p>root     <span class="hljs-number">24047</span> <span class="hljs-number">23986</span> <span class="hljs-number">34</span> <span class="hljs-number">23</span>:<span class="hljs-number">02</span> pts/<span class="hljs-number">2</span>    <span class="hljs-number">00</span>:<span class="hljs-number">00</span>:<span class="hljs-number">04</span> /opt/java/bin/java -cp :/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/conf:/root/working/spark-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-bin-hadoop2/assembly/target/scala-<span class="hljs-number">2.10</span>/spark-assembly_2.<span class="hljs-number">10</span>-<span class="hljs-number">0.9</span>.<span class="hljs-number">1</span>-hadoop2.<span class="hljs-number">2.0</span>.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:<span class="hljs-number">40053</span>/user/CoarseGrainedScheduler <span class="hljs-number">0</span> localhost <span class="hljs-number">4</span> akka.tcp://sparkWorker@localhost:<span class="hljs-number">53568</span>/user/Worker app-<span class="hljs-number">20140511230059</span>-<span class="hljs-number">0000</span></p><p></code>

从运行的进程之间的关系可以看出,worker和master之间的连接建立完毕之后,如果有新的driver application连接上master,master会要求worker启动相应的ExecutorBackend进程。此后若有什么Task需要运行,则会运行在这些Executor之上。可以从以下的日志信息得出此结论,当然看源码亦可。

<code class="ini hljs ">14/05/11 23:02:36 INFO Worker: Asked to launch executor app-20140511230059-0000/0 for Spark shell</p><p>14/05/11 23:02:36 INFO ExecutorRunner: Launch command: "/opt/java/bin/java" "-cp" ":/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar" "-Xms512M" "-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler" "0" "localhost" "4" "akka.tcp://sparkWorker@localhost:53568/user/Worker" "app-20140511230059-0000"</p><p></code>

worker中启动exectuor的相关源码见worker中的receive函数,相关代码如下

<code class="scala hljs "><span class="hljs-keyword">case</span> LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =&gt;</p><p>      <span class="hljs-keyword">if</span> (masterUrl != activeMasterUrl) {</p><p>        logWarning(<span class="hljs-string">"Invalid Master ("</span> + masterUrl + <span class="hljs-string">") attempted to launch executor."</span>)</p><p>      } <span class="hljs-keyword">else</span> {</p><p>        <span class="hljs-keyword">try</span> {</p><p>          logInfo(<span class="hljs-string">"Asked to launch executor %s/%d for %s"</span>.format(appId, execId, appDesc.name))</p><p>          <span class="hljs-keyword">val</span> manager = <span class="hljs-keyword">new</span> ExecutorRunner(appId, execId, appDesc, cores_, memory_,</p><p>            self, workerId, host,</p><p>            appDesc.sparkHome.map(userSparkHome =&gt; <span class="hljs-keyword">new</span> File(userSparkHome)).getOrElse(sparkHome),</p><p>            workDir, akkaUrl, ExecutorState.RUNNING)</p><p>          executors(appId + <span class="hljs-string">"/"</span> + execId) = manager</p><p>          manager.start()</p><p>          coresUsed += cores_</p><p>          memoryUsed += memory_</p><p>          masterLock.synchronized {</p><p>            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)</p><p>          }</p><p>        } <span class="hljs-keyword">catch</span> {</p><p>          <span class="hljs-keyword">case</span> e: Exception =&gt; {</p><p>            logError(<span class="hljs-string">"Failed to launch exector %s/%d for %s"</span>.format(appId, execId, appDesc.name))</p><p>            <span class="hljs-keyword">if</span> (executors.contains(appId + <span class="hljs-string">"/"</span> + execId)) {</p><p>              executors(appId + <span class="hljs-string">"/"</span> + execId).kill()</p><p>              executors -= appId + <span class="hljs-string">"/"</span> + execId</p><p>            }</p><p>            masterLock.synchronized {</p><p>              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)</p><p>            }</p><p>          }</p><p>        }</p><p>      }</p><p></code>

关于standalone的部署,需要详细研究的源码文件如下所列。

  • deploy/master/Master.scala
  • deploy/worker/worker.scala
  • executor/CoarseGrainedExecutorBackend.scala

查看进程之间的父子关系,请用“pstree”

使用下图来小结单Master的部署情况。

类的动态加载和反射

在谈部署Driver到Cluster上之前,我们先回顾一下java的一大特性“类的动态加载和反射机制”。本人不是一直写java代码出身,所以好多东西都是边用边学,难免挂一漏万。

所谓的反射,其实就是要解决在运行期实现类的动态加载。

来个简单的例子

<code class="java hljs "><span class="hljs-keyword">package</span> test;</p><p><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Demo</span> {</span></p><p>    <span class="hljs-keyword">public</span> <span class="hljs-title">Demo</span>() {</p><p>        System.out.println(<span class="hljs-string">"Hi!"</span>);</p><p>    }</p><p>    <span class="hljs-annotation">@SuppressWarnings</span>(<span class="hljs-string">"unchecked"</span>)</p><p>    <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) <span class="hljs-keyword">throws</span> Exception {</p><p>        Class clazz = Class.forName(<span class="hljs-string">"test.Demo"</span>);</p><p>        Demo demo = (Demo) clazz.newInstance();</p><p>    }</p><p>}</p><p></code>

谈到这里,就自然想到了一个面试题,“谈一谈Class.forName和ClassLoader.loadClass的区别”。说到面试,我总是很没有信心,面试官都很屌的, :)。

在cluster中运行Driver Application

上一节之所以写到类的动态加载与反射都是为了谈这一节的内容奠定基础。

将Driver application部署到Cluster中,启动的时序大体如下图所示。

  •  首先启动Master,然后启动Worker
  • 使用”deploy.Client”将Driver Application提交到Cluster中

<code class="bash hljs ">./bin/spark-class org.apache.spark.deploy.Client launch</p><p>   [client-options] \</p><p>      \</p><p>   [application-options]</p><p></code>

  • Master在收到RegisterDriver的请求之后,会发送LaunchDriver给worker,要求worker启动一个Driver的jvm process
  • Driver Application在新生成的JVM进程中运行开始时会注册到master中,发送RegisterApplication给Master
  • Master发送LaunchExecutor给Worker,要求Worker启动执行ExecutorBackend的JVM Process
  • 一当ExecutorBackend启动完毕,Driver Application就可以将任务提交到ExecutorBackend上面执行,即LaunchTask指令

提交侧的代码,详见deploy/Client.scala

<code class="scala hljs ">    driverArgs.cmd <span class="hljs-keyword">match</span> {</p><p>      <span class="hljs-keyword">case</span> <span class="hljs-string">"launch"</span> =&gt;</p><p>        <span class="hljs-comment">// TODO: We could add an env variable here and intercept it in `sc.addJar` that would</span></p><p>        <span class="hljs-comment">//       truncate filesystem paths similar to what YARN does. For now, we just require</span></p><p>        <span class="hljs-comment">//       people call `addJar` assuming the jar is in the same directory.</span></p><p>        <span class="hljs-keyword">val</span> env = Map[String, String]()</p><p>        System.getenv().foreach{<span class="hljs-keyword">case</span> (k, v) =&gt; env(k) = v}</p><p>        <span class="hljs-keyword">val</span> mainClass = <span class="hljs-string">"org.apache.spark.deploy.worker.DriverWrapper"</span></p><p>        <span class="hljs-keyword">val</span> classPathConf = <span class="hljs-string">"spark.driver.extraClassPath"</span></p><p>        <span class="hljs-keyword">val</span> classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =&gt;</p><p>          cp.split(java.io.File.pathSeparator)</p><p>        }</p><p>        <span class="hljs-keyword">val</span> libraryPathConf = <span class="hljs-string">"spark.driver.extraLibraryPath"</span></p><p>        <span class="hljs-keyword">val</span> libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =&gt;</p><p>          cp.split(java.io.File.pathSeparator)</p><p>        }</p><p>        <span class="hljs-keyword">val</span> javaOptionsConf = <span class="hljs-string">"spark.driver.extraJavaOptions"</span></p><p>        <span class="hljs-keyword">val</span> javaOpts = sys.props.get(javaOptionsConf)</p><p>        <span class="hljs-keyword">val</span> command = <span class="hljs-keyword">new</span> Command(mainClass, Seq(<span class="hljs-string">"{{WORKER_URL}}"</span>, driverArgs.mainClass) ++</p><p>          driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)</p><p>        <span class="hljs-keyword">val</span> driverDescription = <span class="hljs-keyword">new</span> DriverDescription(</p><p>          driverArgs.jarUrl,</p><p>          driverArgs.memory,</p><p>          driverArgs.cores,</p><p>          driverArgs.supervise,</p><p>          command)</p><p>        masterActor ! RequestSubmitDriver(driverDescription)</p><p></code>

接收侧

从Deploy.client发送出来的消息被谁接收呢?答案比较明显,那就是Master。 Master.scala中的receive函数有专门针对RequestSubmitDriver的处理,具体代码如下

<code class="scala hljs "><span class="hljs-keyword">case</span> RequestSubmitDriver(description) =&gt; {</p><p>      <span class="hljs-keyword">if</span> (state != RecoveryState.ALIVE) {</p><p>        <span class="hljs-keyword">val</span> msg = s<span class="hljs-string">"Can only accept driver submissions in ALIVE state. Current state: $state."</span></p><p>        sender ! SubmitDriverResponse(<span class="hljs-keyword">false</span>, None, msg)</p><p>      } <span class="hljs-keyword">else</span> {</p><p>        logInfo(<span class="hljs-string">"Driver submitted "</span> + description.command.mainClass)</p><p>        <span class="hljs-keyword">val</span> driver = createDriver(description)</p><p>        persistenceEngine.addDriver(driver)</p><p>        waitingDrivers += driver</p><p>        drivers.add(driver)</p><p>        schedule()</p><p>        <span class="hljs-comment">// TODO: It might be good to instead have the submission client poll the master to determine</span></p><p>        <span class="hljs-comment">//       the current status of the driver. For now it's simply "fire and forget".</span></p><p>        sender ! SubmitDriverResponse(<span class="hljs-keyword">true</span>, Some(driver.id),</p><p>          s<span class="hljs-string">"Driver successfully submitted as ${driver.id}"</span>)</p><p>      }</p><p>    }</p><p></code>

SparkEnv

SparkEnv对于整个Spark的任务来说非常关键,不同的role在创建SparkEnv时传入的参数是不相同的,如Driver和Executor则存在重要区别。

在Executor.scala中,创建SparkEnv的代码如下所示

<code class="scala hljs ">  <span class="hljs-keyword">private</span> <span class="hljs-keyword">val</span> env = {</p><p>    <span class="hljs-keyword">if</span> (!isLocal) {</p><p>      <span class="hljs-keyword">val</span> _env = SparkEnv.create(conf, executorId, slaveHostname, <span class="hljs-number">0</span>,</p><p>        isDriver = <span class="hljs-keyword">false</span>, isLocal = <span class="hljs-keyword">false</span>)</p><p>      SparkEnv.set(_env)</p><p>      _env.metricsSystem.registerSource(executorSource)</p><p>      _env</p><p>    } <span class="hljs-keyword">else</span> {</p><p>      SparkEnv.get</p><p>    }</p><p>  }</p><p></code>

Driver Application则会创建SparkContext,在SparkContext创建过程中,比较重要的一步就是生成SparkEnv,其代码如下

<code class="scala hljs "> <span class="hljs-keyword">private</span>[spark] <span class="hljs-keyword">val</span> env = SparkEnv.create(</p><p>    conf,</p><p>    <span class="hljs-string">""</span>,</p><p>    conf.get(<span class="hljs-string">"spark.driver.host"</span>),</p><p>    conf.get(<span class="hljs-string">"spark.driver.port"</span>).toInt,</p><p>    isDriver = <span class="hljs-keyword">true</span>,</p><p>    isLocal = isLocal,</p><p>    listenerBus = listenerBus)</p><p>  SparkEnv.set(env)</p><p></code>

Standalone模式下HA的实现

Spark在standalone模式下利用zookeeper来实现了HA机制,这里所说的HA是专门针对Master节点的,因为上面所有的分析可以看出Master是整个cluster中唯一可能出现单点失效的节点。

采用zookeeper之后,整个cluster的组成如下图所示。

为了使用zookeeper,Master在启动的时候需要指定如下的参数,修改conf/spark-env.sh, SPARK_DAEMON_JAVA_OPTS中添加如下选项。

System property Meaning
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).

实现HA的原理

 zookeeper提供了一个Leader Election机制,利用这个机制,可以实现HA功能,具体请参考zookeeper recipes

在Spark中没有直接使用zookeeper的api,而是使用了curator,curator对zookeeper做了相应的封装,在使用上更为友好。

小结

步步演进讲到在standalone模式下,如何利用zookeeper来实现ha。从中可以看出standalone master一个最主要的任务就是resource management和job scheduling,看到这两个主要功能的时候,您也许会想到这不就是YARN要解决的问题。对了,从本质上来说standalone是yarn的一个简化版本。

本系列下篇内容就要仔细讲讲spark部署到YARN上的实现细节。

 参考资料

  1. Spark Standalone Mode http://spark.apache.org/docs/latest/spark-standalone.html
  2. Cluster Mode Overview  http://spark.apache.org/docs/latest/cluster-overview.html

来源URL:http://www.cnblogs.com/hseagle/p/3673147.html