假设我有一个 spark 数据框 df1
,有几列(其中列 id
)和数据框 df2
,有两列,id
和 other
。
有没有办法复制以下命令:
sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id")
仅使用 join()
、select()
等 pyspark 函数?
我必须在一个函数中实现这个连接,我不想被迫将 sqlContext 作为函数参数。
星号 (*
) 与别名一起使用。前任:
from pyspark.sql.functions import *
df1 = df1.alias('df1')
df2 = df2.alias('df2')
df1.join(df2, df1.id == df2.id).select('df1.*')
不确定是否是最有效的方法,但这对我有用:
from pyspark.sql.functions import col
df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])
诀窍在于:
[col('a.'+xx) for xx in a.columns] : all columns in a
[col('b.other1'),col('b.other2')] : some columns of b
不使用别名。
df1.join(df2, df1.id == df2.id).select(df1["*"],df2["other"])
df2
的列与 df1
的某些名称相同,在 select 方法中执行 df1["*"]
也会正确地从该数据框中获取列。您介意解释(或链接到文档)这是如何工作的吗?
这是一个不需要 SQL 上下文但维护 DataFrame 元数据的解决方案。
a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']]).toDF(["other", "b_id"])
c = a.join(b, a.a_id == b.b_id)
然后,c.show()
产生:
+----+-----+-----+----+
|a_id|extra|other|b_id|
+----+-----+-----+----+
| a| foo| p1| a|
| b| hem| p2| b|
| c| haw| p3| c|
+----+-----+-----+----+
我相信这将是最简单和最直观的方式:
final = (df1.alias('df1').join(df2.alias('df2'),
on = df1['id'] == df2['id'],
how = 'inner')
.select('df1.*',
'df2.other')
)
删除重复的 b_id
c = a.join(b, a.a_id == b.b_id).drop(b.b_id)
这是执行内部连接并从数据框中选择列并将同一列别名为不同列名的代码片段。
emp_df = spark.read.csv('Employees.csv', header =True);
dept_df = spark.read.csv('dept.csv', header =True)
emp_dept_df = emp_df.join(dept_df,'DeptID').select(emp_df['*'], dept_df['Name'].alias('DName'))
emp_df.show()
dept_df.show()
emp_dept_df.show()
Output for 'emp_df.show()':
+---+---------+------+------+
| ID| Name|Salary|DeptID|
+---+---------+------+------+
| 1| John| 20000| 1|
| 2| Rohit| 15000| 2|
| 3| Parth| 14600| 3|
| 4| Rishabh| 20500| 1|
| 5| Daisy| 34000| 2|
| 6| Annie| 23000| 1|
| 7| Sushmita| 50000| 3|
| 8| Kaivalya| 20000| 1|
| 9| Varun| 70000| 3|
| 10|Shambhavi| 21500| 2|
| 11| Johnson| 25500| 3|
| 12| Riya| 17000| 2|
| 13| Krish| 17000| 1|
| 14| Akanksha| 20000| 2|
| 15| Rutuja| 21000| 3|
+---+---------+------+------+
Output for 'dept_df.show()':
+------+----------+
|DeptID| Name|
+------+----------+
| 1| Sales|
| 2|Accounting|
| 3| Marketing|
+------+----------+
Join Output:
+---+---------+------+------+----------+
| ID| Name|Salary|DeptID| DName|
+---+---------+------+------+----------+
| 1| John| 20000| 1| Sales|
| 2| Rohit| 15000| 2|Accounting|
| 3| Parth| 14600| 3| Marketing|
| 4| Rishabh| 20500| 1| Sales|
| 5| Daisy| 34000| 2|Accounting|
| 6| Annie| 23000| 1| Sales|
| 7| Sushmita| 50000| 3| Marketing|
| 8| Kaivalya| 20000| 1| Sales|
| 9| Varun| 70000| 3| Marketing|
| 10|Shambhavi| 21500| 2|Accounting|
| 11| Johnson| 25500| 3| Marketing|
| 12| Riya| 17000| 2|Accounting|
| 13| Krish| 17000| 1| Sales|
| 14| Akanksha| 20000| 2|Accounting|
| 15| Rutuja| 21000| 3| Marketing|
+---+---------+------+------+----------+
我收到一个错误:使用建议的代码“未找到”:
from pyspark.sql.functions import col df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])
我将 a.columns
更改为 df1.columns
并成功了。
加入后删除重复列的功能。
核实
def dropDupeDfCols(df): newcols = [] dupcols = []
for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)
df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))
return df.toDF(*newcols)
我刚刚从 df2 中删除了我不需要的列并加入了:
sliced_df = df2.select(columns_of_interest)
df1.join(sliced_df, on=['id'], how='left')
**id should be in `columns_of_interest` tho
您可以只进行连接,然后选择想要的列 https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe%20join#pyspark.sql.DataFrame.join
不定期副业成功案例分享
df1 = df1.alias('df1')
和df2 = df2.alias('df2')
。这里的目的是什么?您将df1
重命名为df1
。这不是没用吗?