ChatGPT解决这个技术问题 Extra ChatGPT

Removing duplicate columns after a DF join in Spark

When you join two DFs with similar column names:

df = df1.join(df2, df1['id'] == df2['id'])

Join works fine but you can't call the id column because it is ambiguous and you would get the following exception:

pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, could be: id#5691, id#5918.;"

This makes id not usable anymore...

The following function solves the problem:

def join(df1, df2, cond, how='left'):
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df

What I don't like about it is that I have to iterate over the column names and delete them why by one. This looks really clunky...

Do you know of any other solution that will either join and remove duplicates more elegantly or delete multiple columns without iterating over each of them?

mark the answer will help others.

P
Psidom

If the join columns at both data frames have the same names and you only need equi join, you can specify the join columns as a list, in which case the result will only keep one of the join columns:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+

df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+

df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

Otherwise you need to give the join data frames alias and refer to the duplicated columns by the alias later:

df1.alias("a").join(
    df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

A Simple and Elegant Solution :) Now, if you want to select all columns from alias = a and a single column from alias = b you can also use SQL Syntax like .select("a.*", "b.val2")
j
jerrytim

df.join(other, on, how) when on is a column name string, or a list of column names strings, the returned dataframe will prevent duplicate columns. when on is a join expression, it will result in duplicate columns. We can use .drop(df.a) to drop duplicate columns. Example:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)

That's… unintuitive (different behavior depending on form of on). But great to know -- thanks.
This solution did not work for me (in Spark 3). When trying to drop a column using a reference like this, I get an error: each col in the param list should be a string.
same did not work for me in spark 3 as well...
H
Heapify

Assuming 'a' is a dataframe with column 'id' and 'b' is another dataframe with column 'id'

I use the following two methods to remove duplicates:

Method 1: Using String Join Expression as opposed to boolean expression. This automatically remove a duplicate column for you

a.join(b, 'id')

Method 2: Renaming the column before the join and dropping it after

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)

h
hussam

The code below works with Spark 1.6.0 and above.

salespeople_df.show()
+---+------+-----+
|Num|  Name|Store|
+---+------+-----+
|  1| Henry|  100|
|  2| Karen|  100|
|  3|  Paul|  101|
|  4| Jimmy|  102|
|  5|Janice|  103|
+---+------+-----+

storeaddress_df.show()
+-----+--------------------+
|Store|             Address|
+-----+--------------------+
|  100|    64 E Illinos Ave|
|  101|         74 Grand Pl|
|  102|          2298 Hwy 7|
|  103|No address available|
+-----+--------------------+

Assuming -in this example- that the name of the shared column is the same:

joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()

+-----+---+------+--------------------+
|Store|Num|  Name|             Address|
+-----+---+------+--------------------+
|  100|  1| Henry|    64 E Illinos Ave|
|  100|  2| Karen|    64 E Illinos Ave|
|  101|  3|  Paul|         74 Grand Pl|
|  102|  4| Jimmy|          2298 Hwy 7|
|  103|  5|Janice|No address available|
+-----+---+------+--------------------+

.join will prevent the duplication of the shared column.

Let's assume that you want to remove the column Num in this example, you can just use .drop('colname')

joined=joined.drop('Num')
joined.show()

+-----+------+--------------------+
|Store|  Name|             Address|
+-----+------+--------------------+
|  103|Janice|No address available|
|  100| Henry|    64 E Illinos Ave|
|  100| Karen|    64 E Illinos Ave|
|  101|  Paul|         74 Grand Pl|
|  102| Jimmy|          2298 Hwy 7|
+-----+------+--------------------+

Q
QA Collective

After I've joined multiple tables together, I run them through a simple function to drop columns in the DF if it encounters duplicates while walking from left to right. Alternatively, you could rename these columns too.

Where Names is a table with columns ['Id', 'Name', 'DateId', 'Description'] and Dates is a table with columns ['Id', 'Date', 'Description'], the columns Id and Description will be duplicated after being joined.

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Where dropDupeDfCols is defined as:

def dropDupeDfCols(df):
    newcols = []
    dupcols = []

    for i in range(len(df.columns)):
        if df.columns[i] not in newcols:
            newcols.append(df.columns[i])
        else:
            dupcols.append(i)

    df = df.toDF(*[str(i) for i in range(len(df.columns))])
    for dupcol in dupcols:
        df = df.drop(str(dupcol))

    return df.toDF(*newcols)

The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Date'].


S
Santosh Kumar

In my case I had a dataframe with multiple duplicate columns after joins and I was trying to same that dataframe in csv format, but due to duplicate column I was getting error. I followed below steps to drop duplicate columns. Code is in scala

1) Rename all the duplicate columns and make new dataframe 2) make separate list for all the renamed columns 3) Make new dataframe with all columns (including renamed - step 1) 4) drop all the renamed column

private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
var allColumns:  mutable.MutableList[String] = mutable.MutableList()
val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
dataFrame.columns.foreach((i: String) =>{
if(allColumns.contains(i))

if(allColumns.contains(i))
{allColumns += "dup_" + i
dup_Columns += "dup_" +i
}else{
allColumns += i
}println(i)
})
val columnSeq = allColumns.toSeq
val df = dataFrame.toDF(columnSeq:_*)
val unDF = df.drop(dup_Columns:_*)
unDF
}

to call the above function use below code and pass your dataframe which contains duplicate columns

val uniColDF = removeDuplicateColumns(df)

Thanks This solution works!. Though the are some minor syntax errors. Also don't forget to the imports: import org.apache.spark.sql.DataFrame import scala.collection.mutable
J
Jeremy Caney

Here is simple solution for remove duplicate column

final_result=df1.join(df2,(df1['subjectid']==df2['subjectid']),"left").drop(df1['subjectid'])

A
Anthony Awuley

If you join on a list or string, dup cols are automatically]1 removed This is a scala solution, you could translate the same idea into any language

// get a list of duplicate columns or use a list/seq 
// of columns you would like to join on (note that this list
// should include columns for which you do not want duplicates)
val duplicateCols = df1.columns.intersect(df2.columns) 

// no duplicate columns in resulting DF
df1.join(df2, duplicateCols.distinct.toSet)

M
Mohana B C

Spark SQL version of this answer:

df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")
spark.sql("select * from t1 inner join t2 using (id)").show()

# +---+----+----+
# | id|val1|val2|
# +---+----+----+
# |  1|   2|   2|
# |  1|   2|   3|
# |  2|   3|   4|
# +---+----+----+

J
Jha Ayush

This works for me when multiple columns used to join and need to drop more than one column which are not string type.

final_data = mdf1.alias("a").join(df3.alias("b")
            (mdf1.unique_product_id==df3.unique_product_id) &
            (mdf1.year_week==df3.year_week) ,"left" ).select("a.*","b.promotion_id")

Give a.* to select all columns from one table and from the other table choose specific columns.