原文刊自作者博客Spark 基础概念解读,有更新
本文介绍 Spark 中涉及到的几个基础概念相关名词及其在 Spark 程序(即用户程序, Application)运行过程中的作用,初探 Spark 程序的运行原理。
上图是源自 Spark 官方文档对 Spark 程序运行架构的高度抽象,下文将围绕该图示进行阐述。
概念解读
Cluster Manager
Spark 程序在集群上运行时,需要集群管理者 (Cluster Manager) 为程序分配其运行所需要的资源。Spark 程序是通过一个 SparkContext 对象向 Cluster Manager 发起资源请求的,目前支持的 Cluster Manager 有 Spark standalone cluster manager, Mesos, YARN 或 Kubernetes,我们的集群是使用 YARN 管理的。
Worker
Worker 是和 Cluster Manager 一个范畴的概念,即 Cluster Manager 所管理的集群中可用于计算或存储的主机/虚拟机节点。
Driver program
运行 Spark 程序 main 函数的进程,负责创建 SparkContext 对象进而能向 Cluster Manager 发起资源请求。该进程运行的主机节点是 Driver 节点。
当 Spark 程序以 yarn client 模式提交时,负责提交 Spark 程序的主机 (client) 作为 Driver 节点;
当 Spark 程序以 yarn cluster 模式运行时,运行 spark-submit 脚本的节点 (client) 负责将 Spark 程序和参数传到集群,由 yarn ResourceManager 选取一个 NodeManager 启动 Application Master 进程,并由 Application Master 进程调用 Spark 程序 main 函数,创建 SparkContext 并通过 Application Master 向 Cluster Manager 发起资源请求。
从深层次的含义讲,yarn-cluster 和 yarn-client 模式的区别其实就是 Application Master 进程的区别,yarn-cluster 模式下,driver 运行在 AM (Application Master) 中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN 上运行。然而 yarn-cluster 模式不适合运行交互类型的作业。
而 yarn-client 模式下,Application Master 进程仅仅向 YARN 请求 executor,driver 进程运行在 client 上并与分配的 container 通信来调度他们工作,也就是说 client 要一直存活直到整个 Spark 程序运行完成。
Executor
一个运行在 worker node 上的 JVM 进程。Worker 由 cluster manager 分配给 Spark 程序,启动相应的 JVM 进程,Spark 程序将 task 交由该进程执行,该进程将处理 task 的数据,根据需要保存在内存或硬盘,以便在 task 之间进行共享。每一个 Spark 程序拥有很多个其独享的 executor(分布在多个 Worker Node 上),每一个 executor 中多线程地执行同一个 Spark 程序的 task。 Cluster 模式下,Spark 程序的工作流程如图所示。
这里借用 Quora 上的一句话来总结上述几个概念间的关系。
When the Driver process needs resources to run jobs/tasks, it ask the “Master” for some resources. The “Master” allocates the resources and uses the “Workers” running through out the cluster to create “Executors” for the “Driver”. Then the Driver can run tasks in those “Executors”
Job
A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g. save, collect)。Spark 按照 action 操作将整个 Spark 程序划分为多个 Job。
Stage
Each job gets divided into smaller sets of tasks called stages that depend on each other。一个 Job 会被划分为一个或多个 Stage,每一次引发 shuffle 的操作都会被划分为一个 Stage,action 也会是一个 Stage。一个 Stage 的所有 task 都被执行完后,会将结果写入到磁盘(shuffle write),一方面减少内存压力,另一方面为了容错。
Task
A unit of work that will be sent to one executor。task 是 Spark 程序的执行单位,是 Stage 下的任务执行单元。task 的执行结果一般保存在执行该 task 的 executor 内存,但当结果大小超出设定的阈值时,会将结果保存在磁盘。
代码示例
下面结合一段示例代码来讲解。首先我们从 DATA.GOV 下载 1880-2017 新生婴儿名字,性别,人数统计数据作为测试数据,将其解压后上传到 hdfs 上。
hdfs dfs -put ./names /user/test/
hdfs dfs -ls /user/test/names/
一共 138 个 txt 文件,总大小 24M 左右,txt 文件中的每一行数据格式为名字,性别,新生人数。HDFS 的 block size 一般是 64M 或 128M,如果单个文件小于 block size,该文件会被分配一个 block。因此这 138 个小 txt 文件将分布在 138 个 hdfs block (但是单个文件并不会完全占据该 block 的存储空间)。如果某一个文件的大小大于 block size,那么它将被保存在多个 Block 上。 下面一段代码,统计所有姓名为 Mary 新生儿的人数,并将新生儿名字为 Mary 的行过滤出来保存为文件。
#!/usr/bin/env python3
# filename: mary.py
from pyspark import SparkConf
from pyspark.sql import SparkSession
from operator import add
if __name__ == '__main__':
conf = SparkConf()
spark = SparkSession.builder.appName('app_name').config(conf=conf).getOrCreate()
sc = spark.sparkContext
from_path="/user/test/names"
save_path="/user/test/mary/"
rdd = sc.textFile(from_path, 10)
res=rdd.map(lambda x: tuple(x.split(','))).filter(lambda x: x[0]=='Mary')
cnt=res.map(lambda x: int(x[2])).reduce(add)
print("Mary count::" + str(cnt))
res.repartition(2).saveAsTextFile(save_path)
mary.py 程序被划分为两个 Job,分别对应 reduce 操作和 saveAsTextFile 操作,reduce 操作包含一个 stage 即 reduce 本身;saveAsTextFile 操作分为两个 stage,分别是 repartition 和 saveAsTextFile 。map、filter 和 repartition 都是 transformation,并不触发提交 task 到 executor 执行。其中在 saveAsTextFile 引发的 Job 中,由于 repartition 引起了 shuffle,因此多出来一个 stage。 我们使用下面的 shell 脚本运行该 Spark 程序
#!/usr/bin/env bash
# filename: mary.sh
spark-submit \
--master yarn \
--deploy-mode client \
--name "get_mary" \
--conf spark.default.parallelism=54 \
--num-executors 6 \
--executor-cores 3 \
mary.py
deploy-mode 采用了 yarn client 模式,运行该脚本的机器作为 driver,yarn 作为 cluster manager。在代码中,我们并没有配置 hdfs 等相关项,默认使用的该机器上已经配置好的 hdfs、hive 和 spark 环境信息。本文我们并不讨论如何配置这些环境,可以使用一些命令查看这些配置,例如
# Get list of name nodes in the cluster
hdfs getconf -namenodes
# Gets a specific key from the configuration
hdfs getconf -confKey fs.defaultFS
hdfs getconf -confKey dfs.blocksize
hdfs getconf -confKey yarn.resourcemanager.address
More commands about hdfs, see hadoop hdfs command reference
以 saveAsTextFile 操作为例,上面程序的执行过程如下。
- driver 向 cluster manager 申请 executor,申请的个数由 –num-executors (or spark.executor.instances) 决定,但是不可以超过资源上限,如果不显式指定,则使用默认配置。得到 executor 后,spark 程序在相应的 executor 上启动 jvm,准备执行 task。
-
每一个 executor 的 jvm 程序被分配一个 task,具体到本程序的 saveAsTextFile 操作,第一个 task 是由 repartition 引发的 stage 的一个执行单元。 这个 task 做的事情分别是:读取某一个分区的数据到内存 -> 在内存中,对该分区数据执行
map(lambda x: tuple(x.split(’,’)))
操作 -> 在内存中,对结果执行filter(lambda x: x[0]==‘Mary’)
操作 -> 执行 repartition 操作。由于引起了 shuffle,repartition 是相对复杂的。spark 有多种 repartition 方式,默认使用 HashPartitioner 进行重新分区。
通过查看 Spark UI 上的运行日志,可以发现这一步一共有 138 个 task,亦即每一个文件作为一个分区被读入(textFile 里的第二个参数并没起作用,因为它的作用原本也不是指定读入的分区数),每一个分区数据被一个 task 单独处理。 实际上,task 的个数并不是完全由文件个数或文件包含的 Block 数决定,Spark 读取文件时,可能会将若干个 Block 合并为一个输入分片 InputSplit,InputSplit 不会跨越文件,InputSplit 与 task 个数是一一对应的,每个 task 的执行结果就是生成了目标 RDD 的一个 partition。因此当所有文件的大小都小于 Block size 时,初始的任务个数就和文件个数一样了,和我们例子中看到的一样。 数据读入阶段,输入文件被划分为多少 InputSplit 就会需要多少初始 task, 也就会产生多少 partition。map 操作不改变后续 RDD 的 partition 的数目。reduce 或其它的 shuffle 操作会导致新的 RDD 的 partition 的数目改变。关于 InputSplit 的划分的源码如下。原始文件的个数与 size、mapred.min.split.size
和dfs.block.size
都会影响 InputSplit 的划分。默认情况下,split size 是和 block size 一样的。 ``` // org.apache.hadoop.mapred.FileInputFormat public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { Stopwatch sw = new Stopwatch().start(); FileStatus[] files = listStatus(job);// Save the number of input files for metrics/loadgen job.setLong(NUM_INPUT_FILES, files.length); long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException(“Not a file: “+ file.getPath()); } totalSize += file.getLen(); }
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits ArrayList
splits = new ArrayList (numSplits); NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()); } return splits.toArray(new FileSplit[splits.size()]); }
protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
``` 我们的 executor 个数为 6,每个 executor 有 3 个 core,任务并行数被设置为 54 (和 spark.default.parallelism 有关),也就是每个 executor 多线程地处理多个 task,处理完成后再继续执行下个 task,直到 138 个 task (分区数据) 全部处理完。然后 reducer 从内存(如果空间不足,则 task 结果可能会被放在磁盘中)开始读取每个 task 的结果并将其合并到 2 个分区(repartition(2) 参数 2 的意义),此时必然会发生网络 IO。完成后 repartition stage 的结果会写入磁盘,以供下一个 stage 使用。Spark 默认使用 32KB 的 memory buffer 存储 Shuffle 的中间结果,如果 buffer 满了就会写入磁盘,生成一个小文件,每个 Partition 的多个小文件会在map端处理结束后合并为一个大文件(大文件的总个数等于文件的分区数),这些临时文件会在对应的 application 结束后被删除。
- 执行 saveAsTextFile 操作。相应的 task 会从依次磁盘拉取上一个 stage 的临时结果,读入内存,然后写入磁盘,完成业务。
那么我们再看一下 reduce job,它也是基于 res 进行的进一步处理,但是 res 的还是要从读入 text 文件到 filter 重新执行一遍,如果想要 reduce 和 saveAsTextFile 对 res 的操作,我们可以执行 res.cache() 命令将 res 缓存。需要说明的是,cache 操作本身不会触发 job,只有到某个 job 执行并生成 res 后,它才会被缓存到内存(内存不够,则超出结果会被缓存到磁盘)上,其它的 job 用到 res 时,就会直接使用缓存的 res。
如果要更加系统的地学习 Spark 应用,官方文档是最佳选择。