神刀安全网

Spark 编程指南 (二) [Spark Programming Guide]

Spark的核心概念是弹性分布式数据集— Resilient Distributed Datasets ,这是一个具有容错能力并且可以进行并行计算的元素集合

对于RDD的基本概念,在 Spark 编程指南 (一) [Spark Programming Guide] 中有详细介绍

RDD的创建

用户可以通过两种方式创建RDD:

  • 并行化(Parallelizing)一个已经存在与驱动程序(Driver Program)中的集合(Collection),如set、list

  • 引用外部存储系统上的一个数据集,比如HDFS、HBase,或者任何提供了Hadoop InputFormat的数据源

并行集合(Parallelized Collections)

并行集合是在驱动程序中,由 SparkContext’s parallelize 方法从一个已经存在的迭代器或者集合中创建,集合中的元素会被复制到一个可以进行并行操作的分布式数据集中

例如:如下代码演示如何创建一个元素为1到5的并行数据集

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

这个数据集一旦创建,就可以被并行的操作,例如用下代码就可以对上面列表中元素进行叠加

distData.reduce(lambda a, b: a + b)

在并行集合中有一个重要的参数—分片数,表示数据集的切分片数;Spark会在集群中为每个分片启动一个任务(task),通常情况下你希望集群中的每个CPU都有2—4个分片,但Spark会根据集群情况自动分配分片数;然而,你也可以通过第二个参数手动设置分片数

sc.parallelize(data, 10)

外部数据集(External Datasets)

PySpark可以从Hadoop所支持的任何存储数据源中构建出分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3,Spark支持text files、SequenceFiles和任何Hadoop InputFormat

Text file RDDs可以通过 SparkContext’s textFile 方法创建,这个方法接收一个URI文件地址作为参数(或者是一个本地路径、hdfs://,s3n://等),并读取文件作为行的集合,下面是一个调用实例:

distFile = sc.textFile("data.txt")

一旦创建完成,distFile就可以执行数据集的相关操作。例如:要对文件中的所有行进行求和,就可以用map和reduce操作

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

Spark读取文件时的一些注意事项:

  • 如果用本地文件系统,该文件必须在其工作节点上的相同目录下也可以访问。也可以将文件拷贝到所有的workers节点上,或者使用network-mounted共享文件系统

  • Spark中所有基于文件的输入方法,包括textFile,都支持在目录上运行,压缩文件和通配符。如:可以使用 textFile(“/my/directory”), textFile(“/my/directory/ .txt”), 和 textFile(“/my/directory/ .gz”)

  • textFile方法也带有第二个可选参数,其作用是控制文件的分片数。默认情况下,Spark会为文件的每一个block(在HDFS中,block的默认大小为64MB)创建一个分片,或者你也可以通过传入更大的值,来设置更高的分片数,但要注意,你设置的分片数不能比文件的块数小

除了text files,Spark的Python API还支持其他的数据格式:

  • SparkContext.wholeTextFiles 可以让你读取包含多个小text files的目录,并且对每一个文件返回这样的元祖对(filename, content),而对于对应的textFile,文件的每一行对应着一条上述所说的返回元祖对

  • RDD.saveAsPickleFile 和 SparkContext.pickleFile支持将RDD保存成由pickled Python对象组成的简单格式,使用批处理的方式对pickle的对象进行序列化,默认的处理批次是10

  • SequenceFile 和 Hadoop Input/Output 的格式

注意:这个功能目前属于实验性质的,为高级用户而提供。在将来的版本中,可能会因为支持Spark SQL的读写而被取代,且Spark SQL的读写是首选方法

Writable支持

PySpark的SequenceFile支持加载Java中的键值对RDD(key-value),将Writable转换为基本的Java类型,并且通过Pyrolite在结果Java对象上执行pickles序列化操作。当将一个键值对的RDD保存为SequenceFIle时,PySpark会对其进行反操作。它会unpickles Python的对象为Java对象,然后再将它们转换为Writables。

下表中的Writables会被自动地转换:

Writable Type Python Type
Text unicode str
IntWritable int
FloatWritable float
DoubleWritable float
BooleanWritable bool
BytesWritable bytearray
NullWritable None
MapWritable dict

数组不支持开箱(out-of-the-box)操作。在读或写数组时,用户需要指定自定义的ArrayWritable子类。当写数组时,用户也需要指定自定义的转换器(converters),将数组转换为自定义的ArrayWritable子类。当读数组时,默认的转换器会将自定义的ArrayWritable子类转换为Java的Object[],然后被pickled成Python的元组。如果要获取Python中包含基本数据类型的数组—array.array的话,用户需要为该数组指定自定义的转换器。

保存和加载SequenFiles

同text files类似,SequenceFiles可以被保存和加载到指定路径。可以指定key-value的类型,但对标准的Writables类型则不需要指定

rdd = sc.parallelize(range(1,4)).map(lambda x: (x, "a" * x ))  
rdd.saveAsSequenceFile("path/to/file")
sorted(sc.sequenceFile("path/to/file").collect())

# [(1, u'a'), (2, u'aa'), (3, u'aaa')]

保存和加载其他的Hadoop输入/输出格式

PySpark也可以读、写任何Hadoop InputFormat,包括”新”、”旧”两种Hadoop MapReduce APIs。如果需要的话,可以将传递进来的一个Hadoop配置当成一个Python字典

以下是一个Elasticsearch ESInputFormat的样例:

SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar./bin/pyspark

conf = {"es.resource" :"index/type"} # assumeElasticsearch is running on localhost defaults
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",/
"org.apache.hadoop.io.NullWritable","org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
rdd.first() # the result is a MapWritable that isconverted to a Python dict

# (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', 'field3': 12345})

注意:如果这个InputFormat只是简单地依赖于Hadoop配置和输入路径,以及key-value的类型,它就可以很容易地根据上面的表格进行转换,那么这种方法应该可以很好地处理这些情况

如果你有一个定制序列化的二进制数据(例如从Cassandra/HBase加载的数据),那么你首先要做的是用Scala/Java将数据转换为可供Pyrolite的pickler处理的数据,Converter特质提供了这一转换功能。简单地继承该特质,然后在convert方法中实现你自己的转换代码。记住要确保该类和访问InputFormat所需的依赖,都需要被打包到你的Spark作业的jar包,并且包含在PySpark的类路径中。

在Python样例和Converter样例上给出了带自定义转换器的Cassandra/HBase的InputFormat和OutputFormat使用样例。

RDD操作

RDDs支持两种操作:

  • 转换(transformations) ,可以从已有的数据集创建一个新的数据集

  • 动作(actions) ,在数据集上运行计算后,会向驱动程序返回一个值

例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新RDD来表示结果。另一方面,reduce是一种动作,通过一些函数将所有的元素聚合起来,并将最终结果返回给驱动程序(不过还有一个并行的reduceByKey,能返回一个分布式数据集)。

Spark中的所有转换都是 惰性 的,也就是说它们并不会马上执行得到结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。 只有当触发一个需要返回结果的动作给驱动程序时 ,这些转换才会真正执行,这种设计让Spark更加有效率地运行。例如,我们对map操作创建的数据集进行reduce操作时,只会向驱动返回reduce操作的结果,而不是返回更大的map操作创建的数据集。

默认情况下,每一个转换过的RDD都会在你对它执行一个动作时被重新计算。而然,你也可以使用持久化或者缓存方法,把一个RDD持久化到内存中。在这种情况下,Spark会在集群中保存相关元素,以便你下次查询这个RDD时能更快速地访问。对于把RDDs持久化到磁盘上,或在集群中复制到多个节点同样是支持的。

基础操作

为了描述RDD的基础操作,可以考虑下面的简单程序:

lines = sc.textFile("data.txt")  
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a+ b)

  • 第一行通过一个外部文件定义了一个基本的RDD。这个数据集未被加载到内存,也未执行操作:lines仅仅指向这个文件。
  • 第二行定义了lineLengths作为map转换结果。此外,由于惰性,不会立即计算lineLengths。
  • 最后,我们运行reduce,这是一个动作。这时候,Spark才会将这个计算拆分成不同的task,并运行在独立的机器上,并且每台机器运行它自己的map部分和本地的reducatin,仅仅返回它的结果给驱动程序

如果我们希望以后可以复用lineLengths,可以添加:

lineLengths.persist()

在reduce执行之前,这将导致lineLengths在第一次被计算之后,被保存在内存中

将函数传入Spark

Spark的API,在很大程度上依赖于把驱动程序中的函数传递到集群上运行

有三种推荐方法可以使用:

  • 使用Lambda表达式来编写可以写成一个表达式的简单函数(Lambdas不支持没有返回值的多语句函数或表达式)

  • Spark调用的函数中的Local defs,可以用来代替更长的代码

  • 模块中的顶级函数

例如,如果想传递一个支持使用lambda表达式的更长的函数,可以考虑以下代码:

"""MyScript.py"""  
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)

sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)

注意:由于可能传递的是一个类实例方法的引用(而不是一个单例对象(singleton object)),在传递方法的时候,应该同时传递包含该方法的对象。

比如:

class MyClass(object):  
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)

这里,如果我们创建了一个类实例new MyClass,并且调用了实例的doStuff方法,该方法中的map处调用了这个MyClass实例的func方法,所以需要将整个对象传递到集群中

类似地,访问外部对象的字段时将引用整个对象:

class MyClass(object):  
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + x)

为了避免这种问题,最简单的方式是把field拷贝到本地变量,而不是去外部访问它:

def doStuff(self, rdd):  
field= self.field
return rdd.map(lambda s: field + x)

理解闭包

关于Spark的一个更困难的问题是理解当在一个集群上执行代码的时候,变量和方法的范围以及生命周期。修改范围之外变量的RDD操作经常是造成混乱的源头。在下面的实例中我们看一下使用foreach()来增加一个计数器的代码,不过同样的问题也可能有其他的操作引起。

实例

考虑下面的单纯的RDD元素求和,根据是否运行在同一个虚拟机上,它们表现的行为完全不同。一个简单的例子是在local模式(–master=local[n])下运行Spark对比将Spark程序部署到一个集群上(例如通过spark-submit提交到YARN)

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

本地模式 VS 集群模式

上述代码的行为是未定义的,不能按照预期执行。为了执行作业,Spark将RDD操作拆分成多个task,每个任务由一个执行器操作。在执行前,Spark计算闭包。闭包是指执行器要在RDD上进行计算时必须对执行节点可见的那些变量和方法(在这里是foreach())。这个闭包被序列化并发送到每一个执行器。

闭包中的变量被发送到每个执行器都是被拷贝的,因此,当计数器在foreach函数中引用时,它不再是驱动节点上的那个计数器了。在驱动节点的内存中仍然有一个计数器,但它对执行器来说不再是可见的了。执行器只能看到序列化闭包中的拷贝。因此,计数器最终的值仍然是0,因为所有在计数器上的操作都是引用的序列化闭包中的值。

在这种情况下要确保一个良好定义的行为,应该使用Accumulator。Spark中的累加器是一个专门用来在执行被分散到一个集群中的各个工作节点上的情况下安全更新变量的机制。本指南中的累加器部分会做详细讨论。

一般来说,闭包-构造像循环或者本地定义的方法,不应该用来改变一些全局状态。Spark没有定义或者是保证改变在闭包之外引用的对象的行为。一些这样做的代码可能会在local模式下起作用,但那仅仅是个偶然,这样的代码在分布式模式下是不会按照期望工作的。如果需要一些全局的参数,可以使用累加器。

打印RDD中的元素

另一个常见的用法是使用 rdd.foreach(println) 方法或者 rdd.map(println) 方法试图打印出RDD中的元素。

在单台机器上,这样会产生期望的输出并打印出RDD中的元素。然而,在集群模式中,被执行器调用输出到stdout的输出现在被写到了执行器的stdout,并不是在驱动上的这一个,因此驱动上的stdout不会显示这些信息。

在驱动上打印所有的元素,可以使用collect()方法首先把RDD取回到驱动节点如:

rdd.collect().foreach(println)

然而,这可能导致驱动内存溢出,因为collect()将整个RDD拿到了单台机器上;如果你只需要打印很少几个RDD的元素,一个更安全的方法是使用take()方法:

rdd.take(100).foreach(println)

使用键值对(key-value)

虽然在包含任意类型的对象的RDDs中,可以使用大部分的Spark操作,但也有一些特殊的操作只能在键值(key-value)对的 RDDs上使用

最常见的一个就是分布式的”shuffle”操作,诸如基于key值对元素进行分组或聚合的操作

在Python中,RDDs支持的操作包含Python内置的元组(tuples)操作,比如 (1, 2)。你可以简单地创建这样的元组,然后调用期望的操作

例如,下面的代码在键值(key-value)对上使用 reduceByKey操作来计算在一个文件中每行文本出现的总次数:

lines = sc.textFile("data.txt")  
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我们也可以使用 counts.sortByKey(),例如,按照字典序(alphabetically)排序键值对。最后调用counts.collect()转换成对象的数组形式,返回给驱动程序(driver program)

转换操作

下表中列出了 Spark支持的一些常见的转换。详情请参考RDD API文档 (Scala, Java, Python)和 pair RDD函数文档 (Scala, Java)

Transformation Meaning
map(func) 返回一个新分布式数据集,由每一个输入元素经过func函数计算后组成
filter(func) 返回一个新数据集,由经过func函数计算后返回值为true的元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此func应该返回一个序列(Seq),而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分区(对应块block)上运行,当在类型为T的RDD上运行时,func的函数类型必须是Iterator => Iterator
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分区(partition)的索引值。当在类型为T的RDD上运行时, func的函数类型必须是(Int, Iterator ) => Iterator
sample(withReplacement, fraction, seed) 根据fraction指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 返回一个新的数据集,新数据集由源数据集和参数数据集的元素联合(union)而成
intersection(otherDataset) 返回一个新的数据集,新数据集由源数据集和参数数据集的元素的交集(intersection)组成}
distinct([numTasks])) 返回一个新的数据集,新数据集由源数据集过滤掉多余的重复元素而成
groupByKey([numTasks]) 在一个 (K, V)对的数据集上调用,返回一个 (K, Iterable

)对的数据集

注意 :如果你想在每个key上分组执行聚合(如总和或平均值)操作,使用reduceByKey或combineByKey会产生更好的性能

注意

:默认情况下,输出的并行数依赖于父RDD(parent RDD)的分区数(number of partitions)。你可以通过传递可选的第二个参数numTasks来设置不同的任务数

reduceByKey(func, [numTasks]) 在一个 (K, V)对的数据集上调用时,返回一个 (K, V)对的数据集,使用指定的reduce函数func将相同 key的值聚合到一起,该函数的类型必须是(V,V) => V。类似groupByKey,reduce的任务个数是可以通过第二个可选参数来配置的
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 在一个 (K, V)对的数据集上调用时,返回一个(K, U)对的数据集,对每个键的值使用给定的组合函数(combine functions)和一个中性的”零”值进行聚合。允许聚合后的值类型不同于输入的值类型,从而避免了不必要的内存分配。如同groupByKey,可以通过设置第二个可选参数来配置 reduce任务的个数
sortByKey([ascending], [numTasks]) 在一个 (K, V)对的数据集上调用,其中,K必须实现Ordered,返回一个按照Key进行排序的(K, V)对数据集,升序或降序由布尔参数ascending决定
join(otherDataset, [numTasks]) 在类型为(K, V)和(K, W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))对的数据集。也支持外联(Outer joins),通过使用leftOuterJoin和rightOuterJoin
cogroup(otherDataset, [numTasks]) 在类型为(K, V)和(K, W)的数据集上调用,返回一个(K, Iterable , Iterable )元组(tuples的数据集。这个操作也可以称之为groupWith
cartesian(otherDataset) 笛卡尔积,在类型为T和U类型的数据集上调用时,返回一个(T, U)对的数据集(所有元素交互进行笛卡尔积)
pipe(command, [envVars]) 以管道(Pipe)方式将RDD的各个分区(partition)传递到shell命令,比如一个Perl或bash脚本中。RDD的元素会被写入进程的标准输入(stdin),并且将作为字符串的RDD(RDD of strings),在进程的标准输出(stdout)上输出一行行数据
coalesce(numPartitions) 把RDD的分区数降低到指定的numPartitions。过滤掉一个大数据集之后再执行操作会更加有效
repartition(numPartitions) 随机地对RDD的数据重新洗牌(Reshuffle),以便创建更多或更少的分区,对它们进行平衡。总是对网络上的所有数据进行洗牌(shuffles)
repartitionAndSortWithinPartitions(partitioner) 根据给定的分区器对RDD进行重新分区,在每个结果分区中,将记录按照key值进行排序。这在每个分区中比先调用repartition再排序效率更高,因为它可以推动排序到分牌机器上

动作

下表中列出了 Spark支持的一些常见的动作(actions)。详情请参考 RDD API文档(Scala,Java, Python) 和pair RDD函数文档(Scala, Java)

Action Meaning
reduce(func) 通过函数func(接受两个参数,返回一个参数),聚集数据集中的所有元素。该函数应该是可交换和可结合的,以便它可以正确地并行计算
collect() 在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作,并返回一个足够小的数据子集后再使用会比较有用
count() 返回数据集的元素的个数
first() 返回数据集的第一个元素。 (类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组。注意,这个操作目前不能并行执行,而是由驱动程序(driver program)计算所有的元素
takeSample(withReplacement,num, [seed]) 返回一个数组,由数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,可以指定可选参数seed,预先指定一个随机数生成器的种子
takeOrdered(n, [ordering]) 返回一个由数据集的前 n个元素,并使用自然顺序或定制顺序对这些元素进行排序
saveAsTextFile(path) 将数据集的元素,以text file(或text file的集合)的形式,保存到本地文件系统的指定目录,Spark会对每个元素调用 toString方法,然后转换为文件中的文本行
saveAsSequenceFile(path)(Java and Scala) 将数据集的元素,以Hadoop sequencefile的格式,保存到各种文件系统的指定路径下,包括本地系统, HDFS或者任何其它hadoop支持的文件系统。该方法只能用于键值(key-value)对的RDDs,或者实现了Hadoop的Writable接口的情况下。在 Scala中,也可以用于支持隐式转换为Writable的类型。(Spark包括了基本类型的转换,例如 Int,Double String,等等)
saveAsObjectFile(path)(Java and Scala) 以简单地Java序列化方式将数据集的元素写入指定的路径,对应的可以用SparkContext.objectFile()加载该文件
countByKey() 只对(K,V)类型的RDD有效。返回一个 (K,Int)对的hashmap,其中(K,Int)对表示每一个 key对应的元素个数
foreach(func) 在数据集的每一个元素上,运行 func函数。这通常用于副作用(sideeffects),例如更新一个累加器变量(accumulator variable)(参见下文),或者和外部存储系统进行交互

Shuffle操作

Spark触发一个事件后进行的一些操作成为Shuffle 。Shuffle是Spark重新分配数据的机制,这样它就可以跨分区分组。这通常涉及在执行器和机器之间复制数据,这就使得Shuffle是一个复杂和高代价的操作。

背景

为了理解在洗牌的时候发生了什么,我们可以考虑reduceByKey操作的例子。reduceByKey操作产生了一个新的RDD,在这个RDD中,所有的单个的值被组合成了一个元组,key和执行一个reduce函数后的结果中与这个key有关的所有值。面临的挑战是一个key的所有的值并不都是在同一个分区上的,甚至不是一台机器上的,但是他们必须是可连接的以计算结果

在Spark中,数据一般是不会跨分区分布的,除非是在一个特殊的地方为了某种特定的目的。在计算过程中,单个任务将在单个分区上操作—因此,为了组织所有数据执行单个reduceByKey中的reduce任务,Spark需要执行一个all-to-all操作

它必须读取所有分区,找到所有key的值,并跨分区把这些值放到一起来计算每个key的最终结果—这就叫做Shuffle

尽管在每个分区中新的Shuffle的元素集合是确定性的,分区本身的顺序也同样如此,这些元素的顺序就不一定是了。如果期望在Shuffle后获得可预测的有序的数据,可以使用:

  • mapPartitions来排序每个分区,例如使用,.sorted
  • repartitionAndSortWithinPartitions在重新分区的同时有效地将分区排序
  • sortBy来创建一个全局排序的RDD

可以引起Shuffle的操作有重分区例如repartition和coalesce,‘ByKey操作(除了计数)像groupByKey和reduceByKey,还有join操作例如cogroup和join

性能影响

Shuffle是一个代价高昂的操作,因为它调用磁盘I/O,数据序列化和网络I/O。要组织shuffle的数据,Spark生成一个任务集合—map任务来组织数据,并使用一组reduce任务集合来聚合它。它的命名来自与MapReduce,但并不直接和Spark的map和reduce操作相关

在内部,单个的map任务的结果被保存在内存中,直到他们在内存中存不下为止。然后,他们基于目标分区进行排序,并写入到一个单个的文件中。在reduce这边,任务读取相关的已经排序的块(blocks)

某些shuffle操作会消耗大量的堆内存,因为他们用在内存中的数据结构在转换操作之前和之后都要对数据进行组织。特别的,reduceByKey和aggregateByKey在map侧创建这些结构,‘ByKey操作在reduce侧生成这些结构。当数据在内存中存不下时,Spark会将他们存储到磁盘,造成额外的磁盘开销和增加垃圾收集(GC)

Shuffle也会在磁盘上产生大量的中间文件。在Spark1.3中,这些文件直到Spark停止运行时才会从Spark的临时存储中清理掉,这意味着长时间运行Spark作业会消耗可观的磁盘空间。这些做了之后如果lineage重新计算了,那shuffle不需要重新计算了。在配置Spark上下文时,临时存储目录由spark.local.dir配置参数指定

Shuffle的行为可以通过调整各种配置参数来调整。请看Spark配置指南中的Shuffle Behavior部分

RDD的持久化

Spark最重要的一个功能,就是在不同操作间,将一个数据集持久化(persisting) (或缓存caching)到内存中。当你持久化(persist)一个 RDD,每一个节点都会把它计算的所有分区(partitions)存储在内存中,并在对数据集 (或者衍生出的数据集)执行其他动作(actioins)时重用。这将使得后续动作(actions)的执行变得更加迅速(通常快10倍)。缓存(Caching)是用 Spark 构建迭代算法和快速地交互使用的关键

你可以使用persist()或cache()方法来持久化一个RDD。在首次被一个动作(action)触发计算后,它将会被保存到节点的内存中。Spark的缓存是带有容错机制的,如果 RDD丢失任何一个分区的话,会自动地用原先构建它的转换(transformations)操作来重新进行计算

此外,每一个被持久化的RDD都可以用不同的存储级别(storage level)进行存储,比如,允许你持久化数据集到硬盘,以序列化的Java对象(节省空间)存储到内存,跨节点复制,或者以off-heap的方式存储在Tachyon

这些级别的选择,是通过将一个StorageLevel对象 (Scala, Java, Python)传递到persist()方法中进行设置的。cache()方法是使用默认存储级别的快捷方法,也就是 StorageLevel.MEMORY_ONLY (将反序列化 (deserialized)的对象存入内存)

完整的可选存储级别如下:

Storage Level Meaning
MEMORY_ONLY 将RDD以反序列化(deserialized)的Java对象存储到JVM。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。这是默认的级别
MEMORY_AND_DISK 将RDD以反序列化(deserialized)的Java对象存储到JVM。如果RDD不能被内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取
MEMORY_ONLY_SER 将RDD以序列化(serialized)的Java对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化(deserialized)的空间利用率更高,尤其当使用快速序列化器(fast serializer),但在读取时会比较耗CPU
MEMORY_AND_DISK_SER 类似于MEMORY_ONLY_SER,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算
DISK_ONLY 只将RDD分区存储在硬盘上
MEMORY_ONLY_2,MEMORY_AND_DISK_2, etc. 与上述的存储级别一样,但是将每一个分区都复制到两个集群节点上
OFF_HEAP (experimental) 以序列化的格式 (serialized format) 将RDD存储到Tachyon。相比于MEMORY_ONLY_SER,OFF_HEAP降低了垃圾收集(GC)的开销,并使 executors变得更小而且共享内存池,这在大堆(heaps)和多应用并行的环境下是非常吸引人的。而且,由于RDDs驻留于Tachyon中,executor的崩溃不会导致内存中的缓存丢失。在这种模式下, Tachyon中的内存是可丢弃的。因此,Tachyon不会尝试重建一个在内存中被清除的分块

注意:在Python中,存储对象时总是使用Pickle库来序列化(serialized),而不管你是否选择了一个序列化的级别

Spark也会自动地持久化一些shuffle操作(比如,reduceByKey)的中间数据,即使用户没有调用persist。这么做是为了避免在一个节点上的shuffle过程失败时,重新计算整个输入。如果希望重用它的话,我们仍然建议用户在结果RDD上调用 persist

如何选择存储级别?

Spark的存储级别是在满足内存使用和CPU效率权衡上的不同需求

我们建议通过以下方法进行选择:

  • 如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快

  • 如果不行,试着使用MEMORY_ONLY_SER,并且选择一个快速序列化库使对象在有比较高的空间使用率(space-efficient)的情况下,依然可以较快被访问

  • 尽可能不要存储到硬盘上,除非计算数据集的函数的计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,可能和从硬盘中读取差不多快

  • 如果你想有快速的故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失的数据来恢复错误的容错机制,但是复制的存储级别可以让你在RDD 上持续地运行任务,而不需要等待丢失的分区被重新计算

  • 在大量的内存或多个应用程序的环境下,试验性的OFF_HEAP模式具有以下几个优点:

    • 允许多个 executors共享 Tachyon中相同的内存池

    • 极大地降低了垃圾收集器(garbage collection)的开销

    • 即使个别的 executors崩溃了,缓存的数据也不会丢失

移除数据

Spark会自动监控各个节点上的缓存使用情况,并使用最近最少使用算法(least-recently-used (LRU))删除老的数据分区

如果你想手动移除一个RDD,而不是等它自动从缓存中清除,可以使用RDD.unpersist()方法

参考: Spark Programming Guide 官方文档

转载请注明出处

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Spark 编程指南 (二) [Spark Programming Guide]

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址