ChatGPT解决这个技术问题 Extra ChatGPT

根据 RDD/Spark DataFrame 中的特定列从行中删除重复项

假设我有一个相当大的数据集,格式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

我想做的是仅根据第一列、第三列和第四列的值删除重复的行。

删除完全重复的行很简单:

data = data.distinct()

并且将删除第 5 行或第 6 行

但是我如何只删除基于第 1、3 和 4 列的重复行?即删除其中之一:

('Baz',22,'US',6)
('Baz',36,'US',6)

在 Python 中,这可以通过使用 .drop_duplicates() 指定列来完成。如何在 Spark/Pyspark 中实现相同的目标?


J
Joop

Pyspark 确实包含一个在 1.4 中引入的 dropDuplicates() 方法。 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

有没有办法捕获它确实丢失的记录?
x = usersDf.drop_duplicates(subset=['DETUserId']) - X 数据框将是所有删除的记录
@Rodney这不是文档所说的:“返回一个删除重复行的新DataFrame,可选地只考虑某些列。” spark.apache.org/docs/2.1.0/api/python/…
a
analyticalpicasso

从您的问题来看,尚不清楚您要使用哪些列来确定重复项。该解决方案背后的总体思路是根据标识重复项的列的值创建一个键。然后,您可以使用 reduceByKey 或 reduce 操作来消除重复项。

以下是一些帮助您入门的代码:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

现在,您有一个键值对 RDD,它由第 1,3 和 4 列作为键。下一步将是 reduceByKeygroupByKeyfilter。这将消除重复。

r = m.reduceByKey(lambda x,y: (x))

D
David Griffin

我知道您已经接受了另一个答案,但是如果您想将其作为 DataFrame 执行,只需使用 groupBy 和 agg。假设您已经创建了一个 DF(列名为“col1”、“col2”等),您可以执行以下操作:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

请注意,在这种情况下,我选择了 col2 的 Max,但您可以使用 avg、min 等。


到目前为止,我对 DataFrames 的体验是它们让一切变得更加优雅和快速。
应该注意的是,这个答案是用 Scala 编写的 - 对于 pyspark 将 $"col1" 替换为 col("col1") 等。
t
technotring

同意大卫。补充一点,我们可能不希望对聚合函数中除列之外的所有列进行分组,即,如果我们想纯粹基于列的子集删除重复项并保留原始数据框中的所有列.因此,更好的方法是使用 Spark 1.4.0 中提供的 dropDuplicates Dataframe api

如需参考,请参阅:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame


我们在 SparkR 中是否有相应的功能?
A
Aravind Krishnakumar

我使用了内置函数 dropDuplicates()。下面给出的Scala代码

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

输出 :

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+

该问题专门要求 pyspark 实施,而不是 scala
S
Sampat Kumar

下面的程序将帮助您完全删除重复项,或者如果您想根据某些列删除重复项,您甚至可以这样做:

import org.apache.spark.sql.SparkSession

object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()

import spark.implicits._

// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)

// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

println("*** Here's the whole DataFrame with duplicates")

customerDF.printSchema()

customerDF.show()

// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()

println("*** Now without duplicates")

withoutDuplicates.show()

val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

println("*** Now without partial duplicates too")

withoutPartials.show()

 }
 }

注释“// 删除完全相同的行”第一次是正确的,第二次是错误的。也许是复制/粘贴错误?
谢谢@JoshuaStafford,删除了不好的评论。
N
Nilesh Shinde

这是我的 Df contains 4 重复两次,所以这里将删除重复的值。

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+

您可以检查 spark-shell 我已经共享了正确的输出。这个答案与我们如何删除列或 df 中的重复值有关。
您能否提供一个基于 OP 问题的示例?
我在自己的回答中给出了例子。你可以参考那个。
你的帖子没有增加这个讨论的价值。 @vaerek 已经发布了一个 PySpark df.dropDuplicates() 示例,包括如何将其应用于多个列(我最初的问题)。

关注公众号,不定期副业成功案例分享
关注公众号

不定期副业成功案例分享

领先一步获取最新的外包任务吗?

立即订阅