Spark 累加器 Accumulator 和 广播变量的使用
使用场景:
在rdd的(Transformation)map转换数据中需要统计出现次数的手机号,用来标记这条数据为重复的数据。首先使用guava的HashMultiset 作为计数器。但是rdd的task在处理任务时,分片副本没有这个变量,接下来使用spark自带的累加器处理,并使用guava的计数器自定义一个累加器。
Accumulator简介
Accumulator是spark提供的累加器,顾名思义,该变量只能够增加。
只有driver能获取到Accumulator的值(使用value方法),Task只能对其做增加操作(使用 +=)。你也可以在为Accumulator命名(不支持Python),这样就会在spark web ui中显示,可以帮助你了解程序运行的情况。
Accumulato是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
自定义的MultisetAccumulator
class MultisetAccumulator extends AccumulatorV2[String, HashMultiset[String]] {
private val multiset = HashMultiset.create[String]
override def isZero: Boolean = {
multiset.isEmpty
}
override def copy(): AccumulatorV2[String, HashMultiset[String]] = {
val newAcc = new MultisetAccumulator()
multiset.synchronized {
newAcc.multiset.addAll(multiset)
}
newAcc
}
override def reset(): Unit = {
multiset.clear()
}
override def add(v: String): Unit = {
multiset.add(v)
}
override def merge(other: AccumulatorV2[String, HashMultiset[String]]): Unit = {
other match {
case o: MultisetAccumulator => multiset.addAll(o.value)
}
}
override def value: HashMultiset[String] = {
multiset
}
def count(v: String): Int = {
multiset.count(v)
}
}
统计使用
创建我们自定义的叠加器,注册到sparkContext
val multiset = new MultisetAccumulator
spark.sparkContext.register(multiset, "MultisetAccumulator")
multiset.add("phone")
需要注意的是,由于Spark的lazy机制,只有在save*这个action算子执行后我们才能得到multiset的正确结果。
广播变量
Spark的另一种共享变量是广播变量。通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。
所以这里接着使用广播变量,把multiset丢进去,然后在做一个rdd更新的操作。这个时候在map算子中可以正确获得上面multiset的正确结果
val broadcast = spark.sparkContext.broadcast(multiset)