根据学习火花
请记住,重新分区数据是一项相当昂贵的操作。 Spark 也有一个名为 coalesce() 的 repartition() 优化版本,它允许避免数据移动,但前提是您要减少 RDD 分区的数量。
我得到的一个区别是,使用 repartition()
可以增加/减少分区数,但使用 coalesce()
只能减少分区数。
如果分区分布在多台机器上并运行 coalesce()
,它如何避免数据移动?
它避免了完全洗牌。如果知道数量正在减少,那么执行程序可以安全地将数据保存在最小数量的分区上,只需将数据从额外的节点移到我们保留的节点上。
所以,它会是这样的:
Node 1 = 1,2,3
Node 2 = 4,5,6
Node 3 = 7,8,9
Node 4 = 10,11,12
然后 coalesce
到 2 个分区:
Node 1 = 1,2,3 + (10,11,12)
Node 3 = 7,8,9 + (4,5,6)
请注意,节点 1 和节点 3 不需要移动其原始数据。
贾斯汀的回答很棒,而且这个回答更深入。
repartition
算法进行完全洗牌并使用均匀分布的数据创建新分区。让我们创建一个数字从 1 到 12 的 DataFrame。
val x = (1 to 12).toList
val numbersDf = x.toDF("number")
numbersDf
在我的机器上包含 4 个分区。
numbersDf.rdd.partitions.size // => 4
以下是数据在分区上的划分方式:
Partition 00000: 1, 2, 3
Partition 00001: 4, 5, 6
Partition 00002: 7, 8, 9
Partition 00003: 10, 11, 12
让我们使用 repartition
方法进行一次完全随机播放,并在两个节点上获取此数据。
val numbersDfR = numbersDf.repartition(2)
以下是 numbersDfR
数据在我的机器上的分区方式:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11
repartition
方法创建新分区并将数据均匀分布在新分区中(对于较大的数据集,数据分布更均匀)。
coalesce
和 repartition
之间的区别
coalesce
使用现有分区来最大限度地减少混洗的数据量。 repartition
创建新分区并进行完全洗牌。 coalesce
导致具有不同数据量的分区(有时分区具有很大不同的大小),而 repartition
导致大小大致相等的分区。
coalesce
或 repartition
更快吗?
coalesce
的运行速度可能比 repartition
快,但不等大小的分区通常比等大小的分区运行得慢。您通常需要在过滤大型数据集后重新分区数据集。我发现 repartition
总体上更快,因为 Spark 是为使用相同大小的分区而构建的。
注意我好奇地观察到repartition can increase the size of data on disk。确保在大型数据集上使用重新分区/合并时运行测试。
Read this blog post 如果您想了解更多详情。
当您在实践中使用合并和重新分区时
请参阅此问题,了解如何使用合并和重新分区将 DataFrame 写入单个文件
运行过滤查询后重新分区至关重要。过滤后分区的数量不会改变,所以如果你不重新分区,你会有太多的内存分区(过滤器减少数据集大小越多,问题就越大)。注意空分区问题。
partitionBy 用于写出磁盘分区中的数据。在使用 partitionBy 之前,您需要使用 repartition / coalesce 对内存中的数据进行正确分区。
rdd.glom().map(len).collect()
但它给出了很多 OOM 错误。
repartition
算法对于非常小的数据集不会平均分配数据。我使用 repartition
将 500 万条记录组织到 13 个分区中,每个文件的大小在 89.3 MB 和 89.6 MB 之间 - 相当!
repartition
- 建议在增加分区数量的同时使用它,因为它涉及所有数据的洗牌。
coalesce
- 建议在减少分区数量的同时使用它。例如,如果您有 3 个分区并且想要将其减少到 2 个,coalesce
会将第 3 个分区的数据移动到分区 1 和 2。分区 1 和 2 将保留在同一个容器中。另一方面,repartition
将对所有分区中的数据进行洗牌,因此执行器之间的网络使用率会很高,并且会影响性能。
coalesce
的性能优于 repartition
,同时减少了分区数。
这里要注意的另一点是,Spark RDD 的基本原则是不变性。重新分区或合并将创建新的 RDD。基本 RDD 将继续存在其原始分区数。如果用例需要将 RDD 持久化到缓存中,那么必须对新创建的 RDD 执行相同的操作。
scala> pairMrkt.repartition(10)
res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
scala> res16.partitions.length
res17: Int = 10
scala> pairMrkt.partitions.length
res20: Int = 2
code 和代码文档的内容是 coalesce(n)
与 coalesce(n, shuffle = false)
相同,repartition(n)
与 coalesce(n, shuffle = true)
相同
因此,coalesce
和 repartition
都可用于增加分区数
使用 shuffle = true,您实际上可以合并到更多的分区。如果您有少量分区(例如 100 个),可能会有一些分区异常大,这很有用。
另一个需要强调的重要注意事项是,如果您大幅减少分区数量,您应该考虑使用 shuffled 版本的 coalesce
(在这种情况下与 repartition
相同)。这将允许您的计算在父分区上并行执行(多任务)。
然而,如果你正在做一个剧烈的合并,例如 numPartitions = 1,这可能会导致你的计算发生在比你想要的更少的节点上(例如,在 numPartitions = 1 的情况下是一个节点)。为避免这种情况,您可以传递 shuffle = true。这将添加一个 shuffle 步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
另请参阅相关答案here
所有的答案都为这个经常被问到的问题增加了一些知识。
所以按照这个问题的时间线的传统,这是我的 2 美分。
在非常特殊的情况下,我发现重新分区比合并更快。
在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区的工作速度更快。
这就是我的意思
if(numFiles > 20)
df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
else
df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
在上面的代码片段中,如果我的文件少于 20 个,则合并需要永远完成,而重新分区要快得多,所以上面的代码。
当然,这个数字(20)将取决于工人的数量和数据量。
希望有帮助。
sc._jsc.sc().getExecutorMemoryStatus().size()
其中 sc 是 pyspark SparkContext 对象。如果您使用的是 scala 或 java,那就更简单了:sc.getExecutorMemoryStatus().size()
即使在@Rob的答案中提到的分区号减少的情况下,repartition >> coalesce 也有一个用例,即将数据写入单个文件。
@Rob 的回答暗示了好的方向,但我认为需要进一步解释才能了解幕后发生的事情。
如果您需要在写入之前过滤数据,那么重新分区比合并更合适,因为合并将在加载操作之前被下推。
例如:load().map(…).filter(…).coalesce(1).save()
转换为:load().coalesce(1).map(…).filter(…).save()
这意味着您的所有数据都将折叠到一个分区中,在那里它将被过滤,失去所有并行性。即使对于像 column='value'
这样非常简单的过滤器也会发生这种情况。
重新分区不会发生这种情况:load().map(…).filter(…).repartition(1).save()
在这种情况下,过滤会在原始分区上并行进行。
只是给出一个数量级,在我的情况下,在从 Hive 表加载后过滤 109M 行(~105G)和 ~1000 个分区时,运行时间从合并(1)的 ~6h 下降到重新分区(1)的 ~2m .
具体示例取自this article from AirBnB,它非常好,涵盖了 Spark 中重新分区技术的更多方面。
重新分区:将数据洗牌到新数量的分区中。
例如。初始数据帧被划分为 200 个分区。
df.repartition(500)
:数据将从 200 个分区洗牌到新的 500 个分区。
合并:将数据洗牌到现有数量的分区中。
df.coalesce(5)
:数据将从剩余的 195 个分区打乱到现有的 5 个分区。
我想在 Justin 和 Power 的回答中补充一点——
repartition
将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以提及分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。
coalesce
将使用现有分区并对其中的一个子集进行洗牌。它无法像 repartition
那样修复数据偏差。因此,即使它更便宜,它也可能不是您需要的东西。
对于所有出色的答案,我想补充一点,repartition
是利用数据并行化的最佳选择之一。虽然 coalesce
提供了一个减少分区的廉价选项,并且在将数据写入 HDFS 或其他接收器以利用大写入时非常有用。
我发现这在以镶木地板格式编写数据以充分利用数据时很有用。
基本上,Repartition 允许您增加或减少分区的数量。重新分区重新分配来自所有分区的数据,这会导致完全洗牌,这是非常昂贵的操作。
Coalesce 是 Repartition 的优化版本,您只能减少分区的数量。由于我们只能减少分区的数量,它所做的就是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在 Coalesce 中是最小的数据移动,但说 coalesce 不进行数据移动是完全错误的说法。
另一件事是通过提供分区数量进行重新分区,它尝试在所有分区上均匀地重新分配数据,而在 Coalesce 的情况下,在某些情况下我们仍然可能会出现倾斜数据。
对于从 PySpark (AWS EMR) 生成单个 csv 文件作为输出并将其保存在 s3 上时遇到问题的人,使用重新分区会有所帮助。原因是,coalesce 不能完全洗牌,但 repartition 可以。本质上,您可以使用 repartition 增加或减少分区数,但只能使用 coalesce 减少分区数(但不能减少 1)。以下是尝试将 csv 从 AWS EMR 写入 s3 的任何人的代码:
df.repartition(1).write.format('csv')\
.option("path", "s3a://my.bucket.name/location")\
.save(header = 'true')
Coalesce 使用现有的分区来最小化被洗牌的数据量。重新分区创建新分区并进行完全洗牌。
合并导致具有不同数据量的分区(有时分区具有许多不同的大小),重新分区导致大小大致相等的分区。
合并我们可以减少分区,但我们可以使用修复来增加和减少分区。
但你也应该确保,如果你正在处理大量数据,即将合并节点的数据应该具有高度配置。因为所有的数据都会被加载到那些节点上,可能会导致内存异常。虽然维修费用很高,但我更喜欢使用它。因为它对数据进行了随机分配和平均分配。
在合并和重新分区之间进行选择是明智的。
repartition
算法对数据进行完全洗牌并创建大小相等的数据分区。 coalesce
组合现有分区以避免完全洗牌。
Coalesce 非常适合采用具有大量分区的 RDD,并在单个工作节点上组合分区以生成具有较少分区的最终 RDD。
Repartition
将重新排列 RDD 中的数据以生成您请求的最终分区数。 DataFrame 的分区似乎是一个应该由框架管理的低级实现细节,但事实并非如此。在将大型 DataFrame 过滤成较小的 DataFrame 时,您几乎应该总是对数据进行重新分区。您可能会经常将大型 DataFrame 过滤成较小的 DataFrame,因此请习惯于重新分区。
Read this blog post 如果您想了解更多详情。
以一种简单的方式 COALESCE :- 仅用于减少分区数量,没有数据混洗,它只是压缩分区
REPARTITION:- 用于增加和减少分区的数量,但会发生洗牌
例子:-
val rdd = sc.textFile("path",7)
rdd.repartition(10)
rdd.repartition(2)
两者都工作正常
但是当我们需要在一个集群中查看输出时,我们通常会考虑这两件事,我们会这样做。
另一个区别是考虑到存在倾斜连接并且您必须在其之上合并的情况。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。
另一种情况是,假设您在数据框中保存了中/大量数据,并且您必须批量生产到 Kafka。在某些情况下,重新分区有助于在生产到 Kafka 之前收集列表。但是,当卷非常高时,重新分区可能会导致严重的性能影响。在这种情况下,直接从数据帧生成到 Kafka 会有所帮助。
旁注:Coalesce 不会像工作人员之间的完整数据移动那样避免数据移动。它确实减少了发生的洗牌次数。我想这就是这本书的意思。
以下是代码级别的一些其他详细信息/差异:
此处仅添加函数定义,完整代码实现请查看 spark 的 github 页面。
以下是可用于对数据框进行重新分区的不同方法:检查完整实现 here。
def repartition(numPartitions: Int): Dataset[T]
每当我们在数据帧上调用上述方法时,它都会返回一个新的数据集,该数据集恰好具有 numPartitions 分区。
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个由给定分区表达式分区的新数据集到 numPartitions。生成的数据集是散列分区的。
def repartition(partitionExprs: Column*): Dataset[T]
上述方法返回一个由给定分区表达式分区的新数据集,使用 spark.sql.shuffle.partitions 作为分区数。生成的数据集是散列分区的。
def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T]
上述方法返回一个由给定分区表达式分区的新数据集到 numPartitions。生成的数据集是范围分区的。
def repartitionByRange(partitionExprs: Column*): Dataset[T]
上述方法返回一个由给定分区表达式分区的新数据集,使用 spark.sql.shuffle.partitions 作为分区数。生成的数据集是范围分区的。
但是对于合并,我们在数据帧上只有以下方法:
def coalesce(numPartitions: Int): Dataset[T]
上面的方法将返回一个新的数据集,它正好有 numPartitions
个分区
以下是 RDD 上可用于重新分区和合并的方法:检查完整实现 here。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
基本上,repartition 方法通过将 shuffle 值传递为 true 来调用 coalesce 方法。现在,如果我们通过将 shuffle 值传递为 true 在 RDD 上使用合并方法,我们也可以增加分区!
coalesce -- 可以增加或减少分区 repartition -- 只能增加分区
但我会说性能完全基于用例。并不总是合并比重新分区更好。
Coalesce 比 repartition 执行得更好。合并总是减少分区。假设如果您在 yarn 中启用动态分配,您有四个分区和执行器。如果对其应用过滤器,则一个或多个执行器可能是空的,没有数据。这是可以通过合并而不是重新分区来解决的问题。
不定期副业成功案例分享
minimize data movement
而不是avoiding data movement
。repartition
而不是coalesce
?repartition
所做的只是调用coalesce
并将shuffle
参数设置为 true。让我知道这是否有帮助。shuffle = true
标志调用 coalesce