ChatGPT解决这个技术问题 Extra ChatGPT

Scala 与 Python 的 Spark 性能

我更喜欢 Python 而不是 Scala。但是,由于 Spark 本身是用 Scala 编写的,因此出于显而易见的原因,我希望我的代码在 Scala 中比 Python 版本运行得更快。

有了这个假设,我想学习和为一些 1GB 的数据编写一些非常常见的预处理代码的 Scala 版本。数据是从 Kaggle 的 SpringLeaf 比赛中挑选出来的。只是为了概述数据(它包含 1936 个维度和 145232 行)。数据由各种类型组成,例如 int、float、string、boolean。我使用 8 个内核中的 6 个进行 Spark 处理;这就是我使用 minPartitions=6 的原因,以便每个内核都有要处理的内容。

斯卡拉代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = {
  val vals = line.split(delim1)
  (vals(0), vals(1))
}

val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = {
  val1 + "," + val2
}

val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

https://i.stack.imgur.com/uDJ30.png

https://i.stack.imgur.com/3wwcQ.png

两者都生成不同的 DAG 可视化图(因此,两张图片都显示了 Scala (map) 和 Python (reduceByKey) 的不同阶段 0 函数)

但是,基本上这两个代码都试图将数据转换为(dimension_id,值列表字符串)RDD 并保存到磁盘。输出将用于计算每个维度的各种统计信息。

在性能方面,此类真实数据的 Scala 代码的运行速度似乎比 Python 版本慢 4 倍。对我来说,好消息是它给了我继续使用 Python 的良好动力。坏消息是我不太明白为什么?

也许这取决于代码和应用程序,因为我得到了另一个结果,即 apache spark python is slower than scala, when summing a billion terms of the Leibniz formula for π
有趣的问题!顺便说一句,在这里也可以看看:emptypipes.org/2015/01/17/python-vs-scala-vs-spark 你拥有的内核越多,你看到的语言之间的差异就越少。
您是否考虑过 accepting 现有答案?

1
10465355

讨论代码的原始答案可以在下面找到。

首先,您必须区分不同类型的 API,每种 API 都有自己的性能考虑。

RDD API

(纯 Python 结构和基于 JVM 的编排)

这是受 Python 代码性能和 PySpark 实现细节影响最大的组件。虽然 Python 性能不太可能成为问题,但您至少需要考虑几个因素:

JVM 通信的开销。实际上,所有进出 Python 执行程序的数据都必须通过套接字和 JVM 工作程序。虽然这是一种相对有效的本地通信,但它仍然不是免费的。

基于进程的执行器 (Python) 与基于线程的(单个 JVM 多线程)执行器 (Scala)。每个 Python 执行器都在自己的进程中运行。作为副作用,它提供了比其 JVM 对应物更强的隔离性和对执行程序生命周期的一些控制,但可能显着更高的内存使用量:已加载库的解释器内存占用足迹 广播效率较低(每个进程都需要自己的广播副本)

解释器内存占用

已加载库的足迹

广播效率较低(每个进程都需要自己的广播副本)

Python 代码本身的性能。一般来说,Scala 比 Python 快,但它会因任务而异。此外,您还有多种选择,包括像 Numba 这样的 JIT、C 扩展 (Cython) 或像 Theano 这样的专用库。最后,如果您不使用 ML / MLlib(或简单的 NumPy 堆栈),请考虑使用 PyPy 作为替代解释器。见 SPARK-3094。

PySpark 配置提供 spark.python.worker.reuse 选项,可用于在为每个任务派生 Python 进程和重用现有进程之间进行选择。后一种选择似乎有助于避免昂贵的垃圾收集(它更像是一种印象而不是系统测试的结果),而前一种(默认)对于昂贵的广播和导入来说是最佳选择。

引用计数作为 CPython 中的第一行垃圾收集方法,在典型的 Spark 工作负载(类流处理,无引用周期)中运行良好,并降低了长时间 GC 暂停的风险。

MLlib

(混合 Python 和 JVM 执行)

基本注意事项与以前几乎相同,但还有一些其他问题。虽然与 MLlib 一起使用的基本结构是普通的 Python RDD 对象,但所有算法都直接使用 Scala 执行。

这意味着将 Python 对象转换为 Scala 对象会产生额外的成本,反之亦然,会增加内存使用量以及一些我们将在后面介绍的额外限制。

截至目前 (Spark 2.x),基于 RDD 的 API 处于维护模式,并且是 scheduled to be removed in Spark 3.0

DataFrame API 和 Spark ML

(使用 Python 代码执行的 JVM 仅限于驱动程序)

这些可能是标准数据处理任务的最佳选择。由于 Python 代码大多仅限于驱动程序上的高级逻辑操作,因此 Python 和 Scala 之间应该没有性能差异。

一个例外是使用逐行 Python UDF,其效率明显低于其 Scala 等效项。虽然有一些改进的机会(Spark 2.0.0 已经有了实质性的发展),但最大的限制是内部表示 (JVM) 和 Python 解释器之间的完整往返。如果可能,您应该支持内置表达式的组合 (example。Python UDF 行为在 Spark 2.0.0 中得到了改进,但与本机执行相比仍然不是最佳的。

将来可能会改进随着 vectorized UDFs (SPARK-21190 and further extensions) 的引入而显着改进,它使用 Arrow Streaming 进行有效的数据交换和零拷贝反序列化。对于大多数应用程序,它们的次要开销可以忽略不计。

还要确保避免在 DataFramesRDDs 之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说与 Python 解释器之间的数据传输。

值得注意的是,Py4J 调用具有相当高的延迟。这包括简单的调用,例如:

from pyspark.sql.functions import col

col("foo")

通常,这无关紧要(开销是恒定的,并且不取决于数据量),但在软实时应用程序的情况下,您可以考虑缓存/重用 Java 包装器。

GraphX 和 Spark 数据集

至于现在(Spark 1.6 2.1)都没有提供 PySpark API,所以你可以说 PySpark 比 Scala 差了很多。

实际上,GraphX 开发几乎完全停止,该项目目前处于 related JIRA tickets closed as won't fix 的维护模式。 GraphFrames 库提供了一个带有 Python 绑定的替代图形处理库。

主观地说,在 Python 中静态类型的 Datasets 并没有太多的地方,即使当前的 Scala 实现过于简单,也无法提供与 DataFrame 相同的性能优势。

流媒体

从我目前看到的情况来看,我强烈建议使用 Scala 而不是 Python。如果 PySpark 获得对结构化流的支持,它可能会在未来发生变化,但现在 Scala API 似乎更加健壮、全面和高效。我的经验相当有限。

Spark 2.x 中的结构化流似乎缩小了语言之间的差距,但目前仍处于早期阶段。尽管如此,基于 RDD 的 API 在 Databricks Documentation(访问日期 2017-03-03)中已被称为“传统流式传输”,因此有理由期待进一步的统一努力。

非性能注意事项

并非所有 Spark 功能都通过 PySpark API 公开。请务必检查您需要的部分是否已经实现,并尝试了解可能的限制。

当您使用 MLlib 和类似的混合上下文时,这一点尤其重要(请参阅 Calling Java/Scala function from a task)。公平地说,PySpark API 的某些部分(如 mllib.linalg)提供了比 Scala 更全面的方法集。

PySpark API 紧密地反映了它的 Scala 对应物,因此并不完全是 Pythonic。这意味着在语言之间进行映射非常容易,但与此同时,Python 代码可能更难理解。

与纯 JVM 执行相比,PySpark 数据流相对复杂。推理 PySpark 程序或调试要困难得多。此外,至少必须具备对 Scala 和 JVM 的基本了解。

Dataset API 的持续转变,以及冻结的 RDD API 为 Python 用户带来了机遇和挑战。虽然 API 的高级部分更容易在 Python 中公开,但更高级的功能几乎不可能直接使用。

此外,原生 Python 函数仍然是 SQL 世界中的二等公民。希望这将在未来通过 Apache Arrow 序列化(current efforts target data collection 但 UDF serde 是 long term goal)得到改进。

对于强烈依赖 Python 代码库的项目,纯 Python 替代方案(如 DaskRay)可能是一个有趣的替代方案。

它不必是一个与另一个

Spark DataFrame (SQL, Dataset) API 提供了一种在 PySpark 应用程序中集成 Scala/Java 代码的优雅方式。您可以使用 DataFrames 将数据公开给本机 JVM 代码并读回结果。我已经解释了一些选项 somewhere else,您可以在 How to use a Scala class inside Pyspark 中找到 Python-Scala 往返的工作示例。

它可以通过引入用户定义的类型来进一步增强(参见 How to define schema for custom type in Spark SQL?)。

问题中提供的代码有什么问题

(免责声明:Pythonista 的观点。很可能我错过了一些 Scala 技巧)

首先,您的代码中有一部分根本没有意义。如果您已经使用 zipWithIndexenumerate 创建了 (key, value) 对,那么创建字符串只是为了在之后将其拆分有什么意义? flatMap 不能递归工作,因此您可以简单地产生元组并跳过 map 之后的任何内容。

我发现有问题的另一部分是reduceByKey。一般来说,如果应用聚合函数可以减少必须洗牌的数据量,reduceByKey 很有用。由于您只是连接字符串,因此这里没有任何收获。忽略低级别的东西,例如引用的数量,您必须传输的数据量与 groupByKey 完全相同。

通常我不会详述这一点,但据我所知,这是您的 Scala 代码中的瓶颈。在 JVM 上连接字符串是一项相当昂贵的操作(参见示例:Is string concatenation in scala as costly as it is in Java?)。这意味着类似 _.reduceByKey((v1: String, v2: String) => v1 + ',' + v2) 的内容在您的代码中等同于 input4.reduceByKey(valsConcat) 不是一个好主意。

如果您想避免 groupByKey,可以尝试将 aggregateByKeyStringBuilder 一起使用。与此类似的东西应该可以解决问题:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => {
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  },
  (acc1, acc2) => {
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  }
)

但我怀疑这是否值得大惊小怪。

牢记上述内容,我将您的代码重写如下:

斯卡拉:

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex{
  (idx, iter) => if (idx == 0) iter.drop(1) else iter
}

val pairs = input.flatMap(line => line.split(",").zipWithIndex.map{
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
})

val result = pairs.groupByKey.map{
  case (k, vals) =>  {
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  }
}

result.saveAsTextFile("scalaout")

Python:

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "{0},{1}".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

结果

local[6] 模式下(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行程序需要 4GB 内存(n = 3):

斯卡拉 - 平均:250.00s,标准差:12.49

Python - 平均:246.66s,标准差:1.15

我很确定大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。只是为了好玩,下面是简单的 Python 单线程代码,它可以在不到一分钟的时间内在这台机器上执行相同的任务:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

这是同一个任务吗?最后一个 zip 不是很懒,而且没有保存到文件吗?
佚名

扩展到上述答案 -

与 python 相比,Scala 在很多方面都证明速度更快,但有一些正当的原因可以解释为什么 python 变得比 scala 更受欢迎,让我们看看其中的几个——

Python for Apache Spark 非常容易学习和使用。然而,这并不是 Pyspark 比 Scala 更好的选择的唯一原因。还有更多。

Spark 的 Python API 可能在集群上速度较慢,但最终,与 Scala 相比,数据科学家可以用它做更多的事情。没有 Scala 的复杂性。界面简单而全面。

谈代码的可读性、维护和 Python API for Apache Spark 的熟悉程度远胜于 Scala。

Python 附带了几个与机器学习和自然语言处理相关的库。这有助于数据分析,并且还具有成熟且经过时间考验的统计数据。例如,numpy、pandas、scikit-learn、seaborn 和 matplotlib。

注意:大多数数据科学家使用混合方法,他们使用两种 API 中最好的方法。

最后,Scala 社区对程序员的帮助往往少得多。这使得 Python 成为一个非常有价值的学习。如果您对任何静态类型编程语言(如 Java)有足够的经验,您就可以不用担心完全不使用 Scala。