ChatGPT解决这个技术问题 Extra ChatGPT

Spark - 重新分区()与合并()

根据学习火花

请记住,重新分区数据是一项相当昂贵的操作。 Spark 也有一个名为 coalesce() 的 repartition() 优化版本,它允许避免数据移动,但前提是您要减少 RDD 分区的数量。

我得到的一个区别是,使用 repartition() 可以增加/减少分区数,但使用 coalesce() 只能减少分区数。

如果分区分布在多台机器上并运行 coalesce(),它如何避免数据移动?


J
Justin Pihony

它避免了完全洗牌。如果知道数量正在减少,那么执行程序可以安全地将数据保存在最小数量的分区上,只需将数据从额外的节点移到我们保留的节点上。

所以,它会是这样的:

Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12

然后 coalesce 到 2 个分区:

Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)

请注意,节点 1 和节点 3 不需要移动其原始数据。


感谢您的回复。文档应该更好地使用 minimize data movement 而不是 avoiding data movement
是否有任何情况下应该使用 repartition 而不是 coalesce
@Niemand 我认为当前的文档很好地涵盖了这一点:github.com/apache/spark/blob/… 请记住,repartition 所做的只是调用 coalesce 并将 shuffle 参数设置为 true。让我知道这是否有帮助。
是否可以减少现有分区文件的数量?我没有hdfs,但是很多文件都有问题。
repartition 在统计上会更慢,因为它不知道它正在缩小......尽管也许他们可以优化它。在内部,它只是使用 shuffle = true 标志调用 coalesce
P
Powers

贾斯汀的回答很棒,而且这个回答更深入。

repartition 算法进行完全洗牌并使用均匀分布的数据创建新分区。让我们创建一个数字从 1 到 12 的 DataFrame。

val x = (1 to 12).toList
val numbersDf = x.toDF("number")

numbersDf 在我的机器上包含 4 个分区。

numbersDf.rdd.partitions.size // => 4

以下是数据在分区上的划分方式:

Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12

让我们使用 repartition 方法进行一次完全随机播放,并在两个节点上获取此数据。

val numbersDfR = numbersDf.repartition(2)

以下是 numbersDfR 数据在我的机器上的分区方式:

Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

repartition 方法创建新分区并将数据均匀分布在新分区中(对于较大的数据集,数据分布更均匀)。

coalescerepartition 之间的区别

coalesce 使用现有分区来最大限度地减少混洗的数据量。 repartition 创建新分区并进行完全洗牌。 coalesce 导致具有不同数据量的分区(有时分区具有很大不同的大小),而 repartition 导致大小大致相等的分区。

coalescerepartition 更快吗?

coalesce 的运行速度可能比 repartition 快,但不等大小的分区通常比等大小的分区运行得慢。您通常需要在过滤大型数据集后重新分区数据集。我发现 repartition 总体上更快,因为 Spark 是为使用相同大小的分区而构建的。

注意我好奇地观察到repartition can increase the size of data on disk。确保在大型数据集上使用重新分区/合并时运行测试。

Read this blog post 如果您想了解更多详情。

当您在实践中使用合并和重新分区时

请参阅此问题,了解如何使用合并和重新分区将 DataFrame 写入单个文件

运行过滤查询后重新分区至关重要。过滤后分区的数量不会改变,所以如果你不重新分区,你会有太多的内存分区(过滤器减少数据集大小越多,问题就越大)。注意空分区问题。

partitionBy 用于写出磁盘分区中的数据。在使用 partitionBy 之前,您需要使用 repartition / coalesce 对内存中的数据进行正确分区。


很好的答案@Powers,但分区 A 和 B 中的数据不是倾斜的吗?它是如何均匀分布的?
此外,在不出现 OOM 错误的情况下获得分区大小的最佳方法是什么。我使用 rdd.glom().map(len).collect() 但它给出了很多 OOM 错误。
@anwartheravian - 分区 A 和分区 B 的大小不同,因为 repartition 算法对于非常小的数据集不会平均分配数据。我使用 repartition 将 500 万条记录组织到 13 个分区中,每个文件的大小在 89.3 MB 和 89.6 MB 之间 - 相当!
@Powers 这个看起来更详细的答案。
这更好地解释了差异。谢谢!
Z
ZygD

repartition - 建议在增加分区数量的同时使用它,因为它涉及所有数据的洗牌。

coalesce - 建议在减少分区数量的同时使用它。例如,如果您有 3 个分区并且想要将其减少到 2 个,coalesce 会将第 3 个分区的数据移动到分区 1 和 2。分区 1 和 2 将保留在同一个容器中。另一方面,repartition 将对所有分区中的数据进行洗牌,因此执行器之间的网络使用率会很高,并且会影响性能。

coalesce 的性能优于 repartition,同时减少了分区数。


有用的解释。
@Kamalesan C - 用简单的话很好的解释,我希望我能不止一次地支持这个答案。
H
Harikrishnan Ck

这里要注意的另一点是,Spark RDD 的基本原则是不变性。重新分区或合并将创建新的 RDD。基本 RDD 将继续存在其原始分区数。如果用例需要将 RDD 持久化到缓存中,那么必须对新创建的 RDD 执行相同的操作。

scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26

scala> res16.partitions.length
res17: Int = 10

scala>  pairMrkt.partitions.length
res20: Int = 2

好东西!这是至关重要的,至少对于这个经验丰富的 scala 开发人员来说,这并不明显——即,重新分区和合并都不会尝试修改数据,只是它是如何在节点之间分布的
@Harikrishnan 因此,如果我正确理解了其他答案,那么在合并 Spark 使用现有分区的情况下按照它们,但是由于 RDD 是不可变的,您能否描述 Coalesce 如何使用现有分区?根据我的理解,我认为 Spark 会将新分区附加到合并中的现有分区。
但是,如果执行图所知道的不再使用“旧”RDD,如果不持久,它将从内存中清除,不是吗?
M
Matteo Guarnerio

code 和代码文档的内容是 coalesce(n)coalesce(n, shuffle = false) 相同,repartition(n)coalesce(n, shuffle = true) 相同

因此,coalescerepartition 都可用于增加分区数

使用 shuffle = true,您实际上可以合并到更多的分区。如果您有少量分区(例如 100 个),可能会有一些分区异常大,这很有用。

另一个需要强调的重要注意事项是,如果您大幅减少分区数量,您应该考虑使用 shuffled 版本的 coalesce(在这种情况下与 repartition 相同)。这将允许您的计算在父分区上并行执行(多任务)。

然而,如果你正在做一个剧烈的合并,例如 numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,在 numPartitions = 1 的情况下是一个节点)。为避免这种情况,您可以传递 shuffle = true。这将添加一个 shuffle 步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。

另请参阅相关答案here


J
Jozef Dúc

所有的答案都为这个经常被问到的问题增加了一些知识。

所以按照这个问题的时间线的传统,这是我的 2 美分。

在非常特殊的情况下,我发现重新分区比合并更快。

在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区的工作速度更快。

这就是我的意思

if(numFiles > 20)
    df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
    df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)

在上面的代码片段中,如果我的文件少于 20 个,则合并需要永远完成,而重新分区要快得多,所以上面的代码。

当然,这个数字(20)将取决于工人的数量和数据量。

希望有帮助。


与其将其设置为像 20 这样的硬数字,不如将文件数与集群中的节点数进行比较可能更有意义。您可以使用以下代码行获取执行器的数量:sc._jsc.sc().getExecutorMemoryStatus().size() 其中 sc 是 pyspark SparkContext 对象。如果您使用的是 scala 或 java,那就更简单了:sc.getExecutorMemoryStatus().size()
A
Alessandro S.

即使在@Rob的答案中提到的分区号减少的情况下,repartition >> coalesce 也有一个用例,即将数据写入单个文件。

@Rob 的回答暗示了好的方向,但我认为需要进一步解释才能了解幕后发生的事情。

如果您需要在写入之前过滤数据,那么重新分区比合并更合适,因为合并将在加载操作之前被下推。

例如:load().map(…).filter(…).coalesce(1).save()

转换为:load().coalesce(1).map(…).filter(…).save()

这意味着您的所有数据都将折叠到一个分区中,在那里它将被过滤,失去所有并行性。即使对于像 column='value' 这样非常简单的过滤器也会发生这种情况。

重新分区不会发生这种情况:load().map(…).filter(…).repartition(1).save()

在这种情况下,过滤会在原始分区上并行进行。

只是给出一个数量级,在我的情况下,在从 Hive 表加载后过滤 109M 行(~105G)和 ~1000 个分区时,运行时间从合并(1)的 ~6h 下降到重新分区(1)的 ~2m .

具体示例取自this article from AirBnB,它非常好,涵盖了 Spark 中重新分区技术的更多方面。


你绝对确定这个吗?今天下午我会检查的。
在撰写本文时 100% 使用 Spark 2.4.x,还没有尝试过更新版本,如果你这样做了,请告诉我们! :)
OK 将在本周晚些时候查看 databricks 模拟。干杯
我在 CSV 文件上试过这个。 spark v 2.4.5,但我可以在 DAG 上看到正确的顺序。凝聚在不先来。您能否添加更多详细信息,例如 DAG 或物理计划以显示合并将首先被推下。
M
Matteo Guarnerio

重新分区:将数据洗牌到新数量的分区中。

例如。初始数据帧被划分为 200 个分区。

df.repartition(500):数据将从 200 个分区洗牌到新的 500 个分区。

合并:将数据洗牌到现有数量的分区中。

df.coalesce(5):数据将从剩余的 195 个分区打乱到现有的 5 个分区。


M
Matteo Guarnerio

我想在 Justin 和 Power 的回答中补充一点——

repartition 将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以提及分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。

coalesce 将使用现有分区并对其中的一个子集进行洗牌。它无法像 repartition 那样修复数据偏差。因此,即使它更便宜,它也可能不是您需要的东西。


M
Matteo Guarnerio

对于所有出色的答案,我想补充一点,repartition 是利用数据并行化的最佳选择之一。虽然 coalesce 提供了一个减少分区的廉价选项,并且在将数据写入 HDFS 或其他接收器以利用大写入时非常有用。

我发现这在以镶木地板格式编写数据以充分利用数据时很有用。


N
Nikunj Kakadiya

基本上,Repartition 允许您增加或减少分区的数量。重新分区重新分配来自所有分区的数据,这会导致完全洗牌,这是非常昂贵的操作。

Coalesce 是 Repartition 的优化版本,您只能减少分区的数量。由于我们只能减少分区的数量,它所做的就是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在 Coalesce 中是最小的数据移动,但说 coalesce 不进行数据移动是完全错误的说法。

另一件事是通过提供分区数量进行重新分区,它尝试在所有分区上均匀地重新分配数据,而在 Coalesce 的情况下,在某些情况下我们仍然可能会出现倾斜数据。


R
Rob

对于从 PySpark (AWS EMR) 生成单个 csv 文件作为输出并将其保存在 s3 上时遇到问题的人,使用重新分区会有所帮助。原因是,coalesce 不能完全洗牌,但 repartition 可以。本质上,您可以使用 repartition 增加或减少分区数,但只能使用 coalesce 减少分区数(但不能减少 1)。以下是尝试将 csv 从 AWS EMR 写入 s3 的任何人的代码:

df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')

P
Purushothaman Srikanth

Coalesce 使用现有的分区来最小化被洗牌的数据量。重新分区创建新分区并进行完全洗牌。

合并导致具有不同数据量的分区(有时分区具有许多不同的大小),重新分区导致大小大致相等的分区。

合并我们可以减少分区,但我们可以使用修复来增加和减少分区。


A
Arun Goudar

但你也应该确保,如果你正在处理大量数据,即将合并节点的数据应该具有高度配置。因为所有的数据都会被加载到那些节点上,可能会导致内存异常。虽然维修费用很高,但我更喜欢使用它。因为它对数据进行了随机分配和平均分配。

在合并和重新分区之间进行选择是明智的。


S
Sambhav Kumar

repartition 算法对数据进行完全洗牌并创建大小相等的数据分区。 coalesce 组合现有分区以避免完全洗牌。

Coalesce 非常适合采用具有大量分区的 RDD,并在单个工作节点上组合分区以生成具有较少分区的最终 RDD。

Repartition 将重新排列 RDD 中的数据以生成您请求的最终分区数。 DataFrame 的分区似乎是一个应该由框架管理的低级实现细节,但事实并非如此。在将大型 DataFrame 过滤成较小的 DataFrame 时,您几乎应该总是对数据进行重新分区。您可能会经常将大型 DataFrame 过滤成较小的 DataFrame,因此请习惯于重新分区。

Read this blog post 如果您想了解更多详情。


B
Bujuti Niranjan Reddy

以一种简单的方式 COALESCE :- 仅用于减少分区数量,没有数据混洗,它只是压缩分区

REPARTITION:- 用于增加和减少分区的数量,但会发生洗牌

例子:-

val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)

两者都工作正常

但是当我们需要在一个集群中查看输出时,我们通常会考虑这两件事,我们会这样做。


Coalese 也会有数据移动。
M
Miadnew

另一个区别是考虑到存在倾斜连接并且您必须在其之上合并的情况。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。

另一种情况是,假设您在数据框中保存了中/大量数据,并且您必须批量生产到 Kafka。在某些情况下,重新分区有助于在生产到 Kafka 之前收集列表。但是,当卷非常高时,重新分区可能会导致严重的性能影响。在这种情况下,直接从数据帧生成到 Kafka 会有所帮助。

旁注:Coalesce 不会像工作人员之间的完整数据移动那样避免数据移动。它确实减少了发生的洗牌次数。我想这就是这本书的意思。


M
Mohana B C

以下是代码级别的一些其他详细信息/差异:

此处仅添加函数定义,完整代码实现请查看 spark 的 github 页面。

以下是可用于对数据框进行重新分区的不同方法:检查完整实现 here

def repartition(numPartitions: Int): Dataset[T]

每当我们在数据帧上调用上述方法时,它都会返回一个新的数据集,该数据集恰好具有 numPartitions 分区。

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个由给定分区表达式分区的新数据集到 numPartitions。生成的数据集是散列分区的。

 def repartition(partitionExprs: Column*): Dataset[T]

上述方法返回一个由给定分区表达式分区的新数据集,使用 spark.sql.shuffle.partitions 作为分区数。生成的数据集是散列分区的。

def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]

上述方法返回一个由给定分区表达式分区的新数据集到 numPartitions。生成的数据集是范围分区的。

def repartitionByRange(partitionExprs: Column*): Dataset[T]

上述方法返回一个由给定分区表达式分区的新数据集,使用 spark.sql.shuffle.partitions 作为分区数。生成的数据集是范围分区的。

但是对于合并,我们在数据帧上只有以下方法:

def coalesce(numPartitions: Int): Dataset[T] 

上面的方法将返回一个新的数据集,它正好有 numPartitions 个分区

以下是 RDD 上可用于重新分区和合并的方法:检查完整实现 here

  def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T]

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)

}

基本上,repartition 方法通过将 shuffle 值传递为 true 来调用 coalesce 方法。现在,如果我们通过将 shuffle 值传递为 true 在 RDD 上使用合并方法,我们也可以增加分区!


A
Aakash Agrawal

coalesce -- 可以增加或减少分区 repartition -- 只能增加分区

但我会说性能完全基于用例。并不总是合并比重新分区更好。


A
Abdul Wahab

Coalesce 比 repartition 执行得更好。合并总是减少分区。假设如果您在 yarn 中启用动态分配,您有四个分区和执行器。如果对其应用过滤器,则一个或多个执行器可能是空的,没有数据。这是可以通过合并而不是重新分区来解决的问题。


关注公众号,不定期副业成功案例分享
关注公众号

不定期副业成功案例分享

领先一步获取最新的外包任务吗?

立即订阅