ChatGPT解决这个技术问题 Extra ChatGPT

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Let's say I have a rather large dataset in the following form:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

What I would like to do is remove duplicate rows based on the values of the first,third and fourth columns only.

Removing entirely duplicate rows is straightforward:

data = data.distinct()

and either row 5 or row 6 will be removed

But how do I only remove duplicate rows based on columns 1, 3 and 4 only? i.e. remove either one one of these:

('Baz',22,'US',6)
('Baz',36,'US',6)

In Python, this could be done by specifying columns with .drop_duplicates(). How can I achieve the same in Spark/Pyspark?


J
Joop

Pyspark does include a dropDuplicates() method, which was introduced in 1.4. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

Is there a way to capture the records that it did drop?
x = usersDf.drop_duplicates(subset=['DETUserId']) - X dataframe will be all the dropped records
@Rodney That is not what the documentation says: "Return a new DataFrame with duplicate rows removed, optionally only considering certain columns." spark.apache.org/docs/2.1.0/api/python/…
a
analyticalpicasso

From your question, it is unclear as-to which columns you want to use to determine duplicates. The general idea behind the solution is to create a key based on the values of the columns that identify duplicates. Then, you can use the reduceByKey or reduce operations to eliminate duplicates.

Here is some code to get you started:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

Now, you have a key-value RDD that is keyed by columns 1,3 and 4. The next step would be either a reduceByKey or groupByKey and filter. This would eliminate duplicates.

r = m.reduceByKey(lambda x,y: (x))

D
David Griffin

I know you already accepted the other answer, but if you want to do this as a DataFrame, just use groupBy and agg. Assuming you had a DF already created (with columns named "col1", "col2", etc) you could do:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

Note that in this case, I chose the Max of col2, but you could do avg, min, etc.


So far, my experience with DataFrames is that they make everything more elegant and a lot faster.
It should be noted that this answer is written in Scala - for pyspark replace $"col1" with col("col1") etc.
t
technotring

Agree with David. To add on, it may not be the case that we want to groupBy all columns other than the column(s) in aggregate function i.e, if we want to remove duplicates purely based on a subset of columns and retain all columns in the original dataframe. So the better way to do this could be using dropDuplicates Dataframe api available in Spark 1.4.0

For reference, see: https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame


Do we have corresponding function in SparkR?
A
Aravind Krishnakumar

I used inbuilt function dropDuplicates(). Scala code given below

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

Output :

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+

The question specifically asks for pyspark implementation, not scala
S
Sampat Kumar

The below programme will help you drop duplicates on whole , or if you want to drop duplicates based on certain columns , you can even do that:

import org.apache.spark.sql.SparkSession

object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()

import spark.implicits._

// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)

// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

println("*** Here's the whole DataFrame with duplicates")

customerDF.printSchema()

customerDF.show()

// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()

println("*** Now without duplicates")

withoutDuplicates.show()

val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

println("*** Now without partial duplicates too")

withoutPartials.show()

 }
 }

The comment "// drop fully identical rows" is correct the first time, and incorrect the second time. Perhaps a copy/paste error?
Thanks @JoshuaStafford , removed the bad comment.
N
Nilesh Shinde

This is my Df contain 4 is repeated twice so here will remove repeated values.

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+

you can check in spark-shell i have shared the correct output.. this ans is s related to how we can remove repeated values in column or df..
Can you provide an example based on OPs question?
I have given example in my answer it self. you can refer that one.
Your post adds no value to this discussion. @vaerek has already posted a PySpark df.dropDuplicates() example including how it can be applied to more than one column (my initial question).

关注公众号,不定期副业成功案例分享
Follow WeChat

Success story sharing

Want to stay one step ahead of the latest teleworks?

Subscribe Now