我正在尝试将两个 PySpark 数据帧与仅在其中一个上的一些列连接起来:
from pyspark.sql.functions import randn, rand
df_1 = sqlContext.range(0, 10)
+--+
|id|
+--+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+--+
df_2 = sqlContext.range(11, 20)
+--+
|id|
+--+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+--+
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal_2"))
现在我想生成第三个数据框。我想要熊猫concat
之类的东西:
df_1.show()
+---+--------------------+--------------------+
| id| uniform| normal|
+---+--------------------+--------------------+
| 0| 0.8122802274304282| 1.2423430583597714|
| 1| 0.8642043127063618| 0.3900018344856156|
| 2| 0.8292577771850476| 1.8077401259195247|
| 3| 0.198558705368724| -0.4270585782850261|
| 4|0.012661361966674889| 0.702634599720141|
| 5| 0.8535692890157796|-0.42355804115129153|
| 6| 0.3723296190171911| 1.3789648582622995|
| 7| 0.9529794127670571| 0.16238718777444605|
| 8| 0.9746632635918108| 0.02448061333761742|
| 9| 0.513622008243935| 0.7626741803250845|
+---+--------------------+--------------------+
df_2.show()
+---+--------------------+--------------------+
| id| uniform| normal_2|
+---+--------------------+--------------------+
| 11| 0.3221262660507942| 1.0269298899109824|
| 12| 0.4030672316912547| 1.285648175568798|
| 13| 0.9690555459609131|-0.22986601831364423|
| 14|0.011913836266515876| -0.678915153834693|
| 15| 0.9359607054250594|-0.16557488664743034|
| 16| 0.45680471157575453| -0.3885563551710555|
| 17| 0.6411908952297819| 0.9161177183227823|
| 18| 0.5669232696934479| 0.7270125277020573|
| 19| 0.513622008243935| 0.7626741803250845|
+---+--------------------+--------------------+
#do some concatenation here, how?
df_concat.show()
| id| uniform| normal| normal_2 |
+---+--------------------+--------------------+------------+
| 0| 0.8122802274304282| 1.2423430583597714| None |
| 1| 0.8642043127063618| 0.3900018344856156| None |
| 2| 0.8292577771850476| 1.8077401259195247| None |
| 3| 0.198558705368724| -0.4270585782850261| None |
| 4|0.012661361966674889| 0.702634599720141| None |
| 5| 0.8535692890157796|-0.42355804115129153| None |
| 6| 0.3723296190171911| 1.3789648582622995| None |
| 7| 0.9529794127670571| 0.16238718777444605| None |
| 8| 0.9746632635918108| 0.02448061333761742| None |
| 9| 0.513622008243935| 0.7626741803250845| None |
| 11| 0.3221262660507942| None | 0.123 |
| 12| 0.4030672316912547| None |0.12323 |
| 13| 0.9690555459609131| None |0.123 |
| 14|0.011913836266515876| None |0.18923 |
| 15| 0.9359607054250594| None |0.99123 |
| 16| 0.45680471157575453| None |0.123 |
| 17| 0.6411908952297819| None |1.123 |
| 18| 0.5669232696934479| None |0.10023 |
| 19| 0.513622008243935| None |0.916332123 |
+---+--------------------+--------------------+------------+
那可能吗?
也许您可以尝试创建不存在的列并调用 union
(unionAll
用于 Spark 1.6 或更低版本):
from pyspark.sql.functions import lit
cols = ['id', 'uniform', 'normal', 'normal_2']
df_1_new = df_1.withColumn("normal_2", lit(None)).select(cols)
df_2_new = df_2.withColumn("normal", lit(None)).select(cols)
result = df_1_new.union(df_2_new)
df_concat = df_1.union(df_2)
数据框可能需要具有相同的列,在这种情况下,您可以使用 withColumn()
创建 normal_1
和 normal_2
Also as standard in SQL, this function resolves columns by position (not by name).
您可以使用 unionByName 来做到这一点:
df = df_1.unionByName(df_2)
unionByName 从 Spark 2.3.0 开始可用。
pyspark.sql.utils.AnalysisException: 'Cannot resolve column name "normal" among (id, uniform, normal_2);'
select
对列进行重新排序。谢谢!
unionByName 是 spark 中的一个内置选项,可从 spark 2.3.0 获得。
在 spark 版本 3.1.0 中,有默认值设置为 False 的 allowMissingColumns 选项来处理缺失的列。即使两个数据框都没有相同的列集,此函数也可以工作,在结果数据框中将缺失的列值设置为 null。
df_1.unionByName(df_2, allowMissingColumns=True).show()
+---+--------------------+--------------------+--------------------+
| id| uniform| normal| normal_2|
+---+--------------------+--------------------+--------------------+
| 0| 0.8122802274304282| 1.2423430583597714| null|
| 1| 0.8642043127063618| 0.3900018344856156| null|
| 2| 0.8292577771850476| 1.8077401259195247| null|
| 3| 0.198558705368724| -0.4270585782850261| null|
| 4|0.012661361966674889| 0.702634599720141| null|
| 5| 0.8535692890157796|-0.42355804115129153| null|
| 6| 0.3723296190171911| 1.3789648582622995| null|
| 7| 0.9529794127670571| 0.16238718777444605| null|
| 8| 0.9746632635918108| 0.02448061333761742| null|
| 9| 0.513622008243935| 0.7626741803250845| null|
| 11| 0.3221262660507942| null| 1.0269298899109824|
| 12| 0.4030672316912547| null| 1.285648175568798|
| 13| 0.9690555459609131| null|-0.22986601831364423|
| 14|0.011913836266515876| null| -0.678915153834693|
| 15| 0.9359607054250594| null|-0.16557488664743034|
| 16| 0.45680471157575453| null| -0.3885563551710555|
| 17| 0.6411908952297819| null| 0.9161177183227823|
| 18| 0.5669232696934479| null| 0.7270125277020573|
| 19| 0.513622008243935| null| 0.7626741803250845|
+---+--------------------+--------------------+--------------------+
为了更通用地将两列都保留在 df1
和 df2
中:
import pyspark.sql.functions as F
# Keep all columns in either df1 or df2
def outter_union(df1, df2):
# Add missing columns to df1
left_df = df1
for column in set(df2.columns) - set(df1.columns):
left_df = left_df.withColumn(column, F.lit(None))
# Add missing columns to df2
right_df = df2
for column in set(df1.columns) - set(df2.columns):
right_df = right_df.withColumn(column, F.lit(None))
# Make sure columns are ordered the same
return left_df.union(right_df.select(left_df.columns))
要将多个 pyspark 数据帧连接成一个:
from functools import reduce
reduce(lambda x,y:x.union(y), [df_1,df_2])
您可以将 [df_1, df_2] 的列表替换为任意长度的列表。
这是一种方法,以防它仍然有用:我在 pyspark shell 中运行它,Python 版本 2.7.12,我的 Spark 安装版本是 2.0.1。
PS:我猜你的意思是为 df_1 df_2 使用不同的种子,下面的代码反映了这一点。
from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, rand
import pyspark.sql.functions as F
df_1 = sqlContext.range(0, 10)
df_2 = sqlContext.range(11, 20)
df_1 = df_1.select("id", rand(seed=10).alias("uniform"), randn(seed=27).alias("normal"))
df_2 = df_2.select("id", rand(seed=11).alias("uniform"), randn(seed=28).alias("normal_2"))
def get_uniform(df1_uniform, df2_uniform):
if df1_uniform:
return df1_uniform
if df2_uniform:
return df2_uniform
u_get_uniform = F.udf(get_uniform, FloatType())
df_3 = df_1.join(df_2, on = "id", how = 'outer').select("id", u_get_uniform(df_1["uniform"], df_2["uniform"]).alias("uniform"), "normal", "normal_2").orderBy(F.col("id"))
这是我得到的输出:
df_1.show()
+---+-------------------+--------------------+
| id| uniform| normal|
+---+-------------------+--------------------+
| 0|0.41371264720975787| 0.5888539012978773|
| 1| 0.7311719281896606| 0.8645537008427937|
| 2| 0.1982919638208397| 0.06157382353970104|
| 3|0.12714181165849525| 0.3623040918178586|
| 4| 0.7604318153406678|-0.49575204523675975|
| 5|0.12030715258495939| 1.0854146699817222|
| 6|0.12131363910425985| -0.5284523629183004|
| 7|0.44292918521277047| -0.4798519469521663|
| 8| 0.8898784253886249| -0.8820294772950535|
| 9|0.03650707717266999| -2.1591956435415334|
+---+-------------------+--------------------+
df_2.show()
+---+-------------------+--------------------+
| id| uniform| normal_2|
+---+-------------------+--------------------+
| 11| 0.1982919638208397| 0.06157382353970104|
| 12|0.12714181165849525| 0.3623040918178586|
| 13|0.12030715258495939| 1.0854146699817222|
| 14|0.12131363910425985| -0.5284523629183004|
| 15|0.44292918521277047| -0.4798519469521663|
| 16| 0.8898784253886249| -0.8820294772950535|
| 17| 0.2731073068483362|-0.15116027592854422|
| 18| 0.7784518091224375| -0.3785563841011868|
| 19|0.43776394586845413| 0.47700719174464357|
+---+-------------------+--------------------+
df_3.show()
+---+-----------+--------------------+--------------------+
| id| uniform| normal| normal_2|
+---+-----------+--------------------+--------------------+
| 0| 0.41371265| 0.5888539012978773| null|
| 1| 0.7311719| 0.8645537008427937| null|
| 2| 0.19829196| 0.06157382353970104| null|
| 3| 0.12714182| 0.3623040918178586| null|
| 4| 0.7604318|-0.49575204523675975| null|
| 5|0.120307155| 1.0854146699817222| null|
| 6| 0.12131364| -0.5284523629183004| null|
| 7| 0.44292918| -0.4798519469521663| null|
| 8| 0.88987845| -0.8820294772950535| null|
| 9|0.036507078| -2.1591956435415334| null|
| 11| 0.19829196| null| 0.06157382353970104|
| 12| 0.12714182| null| 0.3623040918178586|
| 13|0.120307155| null| 1.0854146699817222|
| 14| 0.12131364| null| -0.5284523629183004|
| 15| 0.44292918| null| -0.4798519469521663|
| 16| 0.88987845| null| -0.8820294772950535|
| 17| 0.27310732| null|-0.15116027592854422|
| 18| 0.7784518| null| -0.3785563841011868|
| 19| 0.43776396| null| 0.47700719174464357|
+---+-----------+--------------------+--------------------+
pyspark.sql.functions.monotonically_increasing_id())
向每个 df 添加一个索引 col,然后对该列进行连接。 monotonically_increasing_id 不保证从 0 开始,也不保证使用连续的整数。
上面的答案非常优雅。我很久以前就写过这个函数,我也在努力将两个具有不同列的数据帧连接起来。
假设您有数据框 sdf1 和 sdf2
from pyspark.sql import functions as F
from pyspark.sql.types import *
def unequal_union_sdf(sdf1, sdf2):
s_df1_schema = set((x.name, x.dataType) for x in sdf1.schema)
s_df2_schema = set((x.name, x.dataType) for x in sdf2.schema)
for i,j in s_df2_schema.difference(s_df1_schema):
sdf1 = sdf1.withColumn(i,F.lit(None).cast(j))
for i,j in s_df1_schema.difference(s_df2_schema):
sdf2 = sdf2.withColumn(i,F.lit(None).cast(j))
common_schema_colnames = sdf1.columns
sdk = \
sdf1.select(common_schema_colnames).union(sdf2.select(common_schema_colnames))
return sdk
sdf_concat = unequal_union_sdf(sdf1, sdf2)
这应该为你做......
from pyspark.sql.types import FloatType
from pyspark.sql.functions import randn, rand, lit, coalesce, col
import pyspark.sql.functions as F
df_1 = sqlContext.range(0, 6)
df_2 = sqlContext.range(3, 10)
df_1 = df_1.select("id", lit("old").alias("source"))
df_2 = df_2.select("id")
df_1.show()
df_2.show()
df_3 = df_1.alias("df_1").join(df_2.alias("df_2"), df_1.id == df_2.id, "outer")\
.select(\
[coalesce(df_1.id, df_2.id).alias("id")] +\
[col("df_1." + c) for c in df_1.columns if c != "id"])\
.sort("id")
df_3.show()
我试图在 pyspark 中实现 pandas 附加功能,并且我创建了一个自定义函数,我们可以在其中连接 2 个或更多数据框,即使它们的编号不同。列的唯一条件是如果数据框具有相同的名称,那么它们的数据类型应该相同/匹配。
我编写了一个自定义函数来合并 2 个数据帧。
def append_dfs(df1,df2):
list1 = df1.columns
list2 = df2.columns
for col in list2:
if(col not in list1):
df1 = df1.withColumn(col, F.lit(None))
for col in list1:
if(col not in list2):
df2 = df2.withColumn(col, F.lit(None))
return df1.unionByName(df2)
用法:
连接 2 个数据帧 final_df = append_dfs(df1,df2) 连接超过 2(say3) 个数据帧 final_df = append_dfs(append_dfs(df1,df2),df3)
例子:
df1:
https://i.stack.imgur.com/8vqGe.png
df2:
https://i.stack.imgur.com/Zkn7d.png
结果=追加_dfs(df1,df2)
结果 :
https://i.stack.imgur.com/ulWHJ.png
希望这会有用。
我会以这种方式解决这个问题:
from pyspark.sql import SparkSession
df_1.createOrReplaceTempView("tab_1")
df_2.createOrReplaceTempView("tab_2")
df_concat=spark.sql("select tab_1.id,tab_1.uniform,tab_1.normal,tab_2.normal_2 from tab_1 tab_1 left join tab_2 tab_2 on tab_1.uniform=tab_2.uniform\
union\
select tab_2.id,tab_2.uniform,tab_1.normal,tab_2.normal_2 from tab_2 tab_2 left join tab_1 tab_1 on tab_1.uniform=tab_2.uniform")
df_concat.show()
也许,你想连接更多的两个数据框。我发现了一个使用 pandas Dataframe 转换的问题。
假设您有 3 个想要连接的 spark Dataframe。
代码如下:
list_dfs = []
list_dfs_ = []
df = spark.read.json('path_to_your_jsonfile.json',multiLine = True)
df2 = spark.read.json('path_to_your_jsonfile2.json',multiLine = True)
df3 = spark.read.json('path_to_your_jsonfile3.json',multiLine = True)
list_dfs.extend([df,df2,df3])
for df in list_dfs :
df = df.select([column for column in df.columns]).toPandas()
list_dfs_.append(df)
list_dfs.clear()
df_ = sqlContext.createDataFrame(pd.concat(list_dfs_))
不定期副业成功案例分享
unionAll()
在 spark 2.0 中已弃用。改用union()
withColumnRenamed
spark.apache.org/docs/latest/api/python/… 重命名result.dropDuplicates()
删除重复项。