ChatGPT解决这个技术问题 Extra ChatGPT

在 Spark 中的 DF 连接后删除重复的列

当您加入具有相似列名的两个 DF 时:

df = df1.join(df2, df1['id'] == df2['id'])

加入工作正常,但您不能调用 id 列,因为它不明确,您会得到以下异常:

pyspark.sql.utils.AnalysisException:“参考 'id' 不明确,可能是:id#5691, id#5918.;”

这使得 id 不再可用...

下面的函数解决了这个问题:

def join(df1, df2, cond, how='left'):
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df

我不喜欢它的是我必须遍历列名并将它们删除为什么。这看起来真的很笨拙...

您是否知道任何其他解决方案可以更优雅地加入和删除重复项或删除多个列而不迭代每个列?

标记答案会帮助别人。

P
Psidom

如果两个数据框的连接列具有相同的名称并且您只需要 equi 连接,则可以将连接列指定为列表,在这种情况下,结果将只保留连接列之一:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+

df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+

df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

否则,您需要为连接数据框提供别名,并稍后通过别名引用重复的列:

df1.alias("a").join(
    df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

一个简单而优雅的解决方案 :) 现在,如果您想从 alias = a 中选择所有列并从 alias = b 中选择一个列,您还可以使用 SQL 语法,如 .select("a.*", "b.val2")
j
jerrytim

df.join(other, on, how)on 是列名字符串或列名字符串列表时,返回的数据框将防止出现重复列。当 on 是一个连接表达式时,它会导致重复的列。我们可以使用 .drop(df.a) 删除重复的列。例子:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)

那是……不直观(不同的行为取决于 on 的形式)。但很高兴知道 - 谢谢。
这个解决方案对我不起作用(在 Spark 3 中)。尝试使用这样的引用删除列时,出现错误:each col in the param list should be a string
在 spark 3 中同样对我不起作用......
H
Heapify

假设 'a' 是具有列 'id' 的数据框,而 'b' 是另一个具有列 'id' 的数据框

我使用以下两种方法来删除重复项:

方法 1:使用字符串连接表达式而不是布尔表达式。这会自动为您删除重复的列

a.join(b, 'id')

方法 2:在加入之前重命名列并在之后删除它

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)

h
hussam

下面的代码适用于 Spark 1.6.0 及更高版本。

salespeople_df.show()
+---+------+-----+
|Num|  Name|Store|
+---+------+-----+
|  1| Henry|  100|
|  2| Karen|  100|
|  3|  Paul|  101|
|  4| Jimmy|  102|
|  5|Janice|  103|
+---+------+-----+

storeaddress_df.show()
+-----+--------------------+
|Store|             Address|
+-----+--------------------+
|  100|    64 E Illinos Ave|
|  101|         74 Grand Pl|
|  102|          2298 Hwy 7|
|  103|No address available|
+-----+--------------------+

假设 - 在这个例子中 - 共享列的名称是相同的:

joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()

+-----+---+------+--------------------+
|Store|Num|  Name|             Address|
+-----+---+------+--------------------+
|  100|  1| Henry|    64 E Illinos Ave|
|  100|  2| Karen|    64 E Illinos Ave|
|  101|  3|  Paul|         74 Grand Pl|
|  102|  4| Jimmy|          2298 Hwy 7|
|  103|  5|Janice|No address available|
+-----+---+------+--------------------+

.join 将防止共享列的重复。

假设您要在此示例中删除列 Num,您可以只使用 .drop('colname')

joined=joined.drop('Num')
joined.show()

+-----+------+--------------------+
|Store|  Name|             Address|
+-----+------+--------------------+
|  103|Janice|No address available|
|  100| Henry|    64 E Illinos Ave|
|  100| Karen|    64 E Illinos Ave|
|  101|  Paul|         74 Grand Pl|
|  102| Jimmy|          2298 Hwy 7|
+-----+------+--------------------+

Q
QA Collective

将多个表连接在一起后,我通过一个简单的函数运行它们,如果在从左到右行走时遇到重复项,则将列删除到 DF 中。或者,you could rename these columns too

其中 Names 是包含列 ['Id', 'Name', 'DateId', 'Description'] 的表,Dates 是包含列 ['Id', 'Date', 'Description'] 的表,列 IdDescription 将在连接后重复。

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

其中 dropDupeDfCols 定义为:

def dropDupeDfCols(df):
    newcols = []
    dupcols = []

    for i in range(len(df.columns)):
        if df.columns[i] not in newcols:
            newcols.append(df.columns[i])
        else:
            dupcols.append(i)

    df = df.toDF(*[str(i) for i in range(len(df.columns))])
    for dupcol in dupcols:
        df = df.drop(str(dupcol))

    return df.toDF(*newcols)

生成的数据框将包含列 ['Id', 'Name', 'DateId', 'Description', 'Date']


S
Santosh Kumar

在我的情况下,我有一个数据框,在连接后有多个重复的列,我试图以 csv 格式使用相同的数据框,但由于重复的列,我得到了错误。我按照以下步骤删除重复的列。代码在 scala 中

1) Rename all the duplicate columns and make new dataframe 2) make separate list for all the renamed columns 3) Make new dataframe with all columns (including renamed - step 1) 4) drop all the renamed column

private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
var allColumns:  mutable.MutableList[String] = mutable.MutableList()
val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
dataFrame.columns.foreach((i: String) =>{
if(allColumns.contains(i))

if(allColumns.contains(i))
{allColumns += "dup_" + i
dup_Columns += "dup_" +i
}else{
allColumns += i
}println(i)
})
val columnSeq = allColumns.toSeq
val df = dataFrame.toDF(columnSeq:_*)
val unDF = df.drop(dup_Columns:_*)
unDF
}

to call the above function use below code and pass your dataframe which contains duplicate columns

val uniColDF = removeDuplicateColumns(df)

谢谢这个解决方案有效!虽然是一些小的语法错误。也不要忘记导入: import org.apache.spark.sql.DataFrame import scala.collection.mutable
J
Jeremy Caney

这是删除重复列的简单解决方案

final_result=df1.join(df2,(df1['subjectid']==df2['subjectid']),"left").drop(df1['subjectid'])

A
Anthony Awuley

如果您加入列表或字符串,则会自动删除 dup cols]1 这是一个 scala 解决方案,您可以将相同的想法翻译成任何语言

// get a list of duplicate columns or use a list/seq 
// of columns you would like to join on (note that this list
// should include columns for which you do not want duplicates)
val duplicateCols = df1.columns.intersect(df2.columns) 

// no duplicate columns in resulting DF
df1.join(df2, duplicateCols.distinct.toSet)

M
Mohana B C

this 答案的 Spark SQL 版本:

df1.createOrReplaceTempView("t1")
df2.createOrReplaceTempView("t2")
spark.sql("select * from t1 inner join t2 using (id)").show()

# +---+----+----+
# | id|val1|val2|
# +---+----+----+
# |  1|   2|   2|
# |  1|   2|   3|
# |  2|   3|   4|
# +---+----+----+

J
Jha Ayush

当多个列用于连接并且需要删除多个非字符串类型的列时,这对我有用。

final_data = mdf1.alias("a").join(df3.alias("b")
            (mdf1.unique_product_id==df3.unique_product_id) &
            (mdf1.year_week==df3.year_week) ,"left" ).select("a.*","b.promotion_id")

给出 a.* 以从一个表中选择所有列,并从另一个表中选择特定列。


关注公众号,不定期副业成功案例分享
关注公众号

不定期副业成功案例分享

领先一步获取最新的外包任务吗?

立即订阅