ChatGPT解决这个技术问题 Extra ChatGPT

加入两个数据框,从一个中选择所有列,从另一个中选择一些列

假设我有一个 spark 数据框 df1,有几列(其中列 id)和数据框 df2,有两列,idother

有没有办法复制以下命令:

sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id")

仅使用 join()select() 等 pyspark 函数?

我必须在一个函数中实现这个连接,我不想被迫将 sqlContext 作为函数参数。


c
cronoik

星号 (*) 与别名一起使用。前任:

from pyspark.sql.functions import *

df1 = df1.alias('df1')
df2 = df2.alias('df2')

df1.join(df2, df1.id == df2.id).select('df1.*')

完美——完整的解决方案; { df1.join(df2, df1.id == df2.id).select('df1.*', 'df2.other') }
您写了 df1 = df1.alias('df1')df2 = df2.alias('df2')。这里的目的是什么?您将 df1 重命名为 df1。这不是没用吗?
不知何故,这种方法对我来说不适用于 Spark 3。
R
Ramesh Maharjan

不确定是否是最有效的方法,但这对我有用:

from pyspark.sql.functions import col

df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])

诀窍在于:

[col('a.'+xx) for xx in a.columns] : all columns in a

[col('b.other1'),col('b.other2')] : some columns of b

在 spark2 中,我不得不将其更改为 col('b.id') == col('a.id') (带有两个等号)。否则,它会给我一个“语法错误:关键字不能是表达式”异常
嗨,我如何将多个列作为列表传递,而不是像这样 [col('b.other1'),col('b.other2')] 用于 df2 数据集的单个列
A
Akhilesh Bharadwaj

不使用别名。

df1.join(df2, df1.id == df2.id).select(df1["*"],df2["other"])

我注意到,当连接的数据框具有同名的列名时,即使 df2 的列与 df1 的某些名称相同,在 select 方法中执行 df1["*"] 也会正确地从该数据框中获取列。您介意解释(或链接到文档)这是如何工作的吗?
K
Katya Willard

这是一个不需要 SQL 上下文但维护 DataFrame 元数据的解决方案。

a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']]).toDF(["other", "b_id"])

c = a.join(b, a.a_id == b.b_id)

然后,c.show() 产生:

+----+-----+-----+----+
|a_id|extra|other|b_id|
+----+-----+-----+----+
|   a|  foo|   p1|   a|
|   b|  hem|   p2|   b|
|   c|  haw|   p3|   c|
+----+-----+-----+----+

好吧,OP 只要求选择几个列,即。过滤,答案包含连接后的所有列。
X
Xehron

我相信这将是最简单和最直观的方式:

final = (df1.alias('df1').join(df2.alias('df2'),
                               on = df1['id'] == df2['id'],
                               how = 'inner')
                         .select('df1.*',
                                 'df2.other')
)

S
Selvaraj S.

删除重复的 b_id

c = a.join(b, a.a_id == b.b_id).drop(b.b_id)

S
Sunil

这是执行内部连接并从数据框中选择列并将同一列别名为不同列名的代码片段。

emp_df  = spark.read.csv('Employees.csv', header =True);
dept_df = spark.read.csv('dept.csv', header =True)


emp_dept_df = emp_df.join(dept_df,'DeptID').select(emp_df['*'], dept_df['Name'].alias('DName'))
emp_df.show()
dept_df.show()
emp_dept_df.show()
Output  for 'emp_df.show()':

+---+---------+------+------+
| ID|     Name|Salary|DeptID|
+---+---------+------+------+
|  1|     John| 20000|     1|
|  2|    Rohit| 15000|     2|
|  3|    Parth| 14600|     3|
|  4|  Rishabh| 20500|     1|
|  5|    Daisy| 34000|     2|
|  6|    Annie| 23000|     1|
|  7| Sushmita| 50000|     3|
|  8| Kaivalya| 20000|     1|
|  9|    Varun| 70000|     3|
| 10|Shambhavi| 21500|     2|
| 11|  Johnson| 25500|     3|
| 12|     Riya| 17000|     2|
| 13|    Krish| 17000|     1|
| 14| Akanksha| 20000|     2|
| 15|   Rutuja| 21000|     3|
+---+---------+------+------+

Output  for 'dept_df.show()':
+------+----------+
|DeptID|      Name|
+------+----------+
|     1|     Sales|
|     2|Accounting|
|     3| Marketing|
+------+----------+

Join Output:
+---+---------+------+------+----------+
| ID|     Name|Salary|DeptID|     DName|
+---+---------+------+------+----------+
|  1|     John| 20000|     1|     Sales|
|  2|    Rohit| 15000|     2|Accounting|
|  3|    Parth| 14600|     3| Marketing|
|  4|  Rishabh| 20500|     1|     Sales|
|  5|    Daisy| 34000|     2|Accounting|
|  6|    Annie| 23000|     1|     Sales|
|  7| Sushmita| 50000|     3| Marketing|
|  8| Kaivalya| 20000|     1|     Sales|
|  9|    Varun| 70000|     3| Marketing|
| 10|Shambhavi| 21500|     2|Accounting|
| 11|  Johnson| 25500|     3| Marketing|
| 12|     Riya| 17000|     2|Accounting|
| 13|    Krish| 17000|     1|     Sales|
| 14| Akanksha| 20000|     2|Accounting|
| 15|   Rutuja| 21000|     3| Marketing|
+---+---------+------+------+----------+

f
filip stepniak

我收到一个错误:使用建议的代码“未找到”:

from pyspark.sql.functions import col df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])

我将 a.columns 更改为 df1.columns 并成功了。


更改变量的名称应该很明显。
a
aamirmalik124

加入后删除重复列的功能。

核实

def dropDupeDfCols(df): newcols = [] dupcols = []

for i in range(len(df.columns)):
    if df.columns[i] not in newcols:
        newcols.append(df.columns[i])
    else:
        dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
    df = df.drop(str(dupcol))

return df.toDF(*newcols)

J
Johan Khanye

我刚刚从 df2 中删除了我不需要的列并加入了:

sliced_df = df2.select(columns_of_interest)
df1.join(sliced_df, on=['id'], how='left')
**id should be in `columns_of_interest` tho

E
Erica

您可以只进行连接,然后选择想要的列 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe%20join#pyspark.sql.DataFrame.join


我的问题是如何从一个数据框中选择所有列(不一一列举)和另一列