ChatGPT解决这个技术问题 Extra ChatGPT

向 Spark DataFrame 添加一个空列

如网络上的 many other locations 中所述,向现有 DataFrame 添加新列并不简单。不幸的是,拥有此功能很重要(即使它在分布式环境中效率低下),尤其是在尝试使用 unionAll 连接两个 DataFrame 时。

null 列添加到 DataFrame 以促进 unionAll 的最优雅的解决方法是什么?

我的版本是这样的:

from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))

C
Community

您需要的只是文字和演员表:

from pyspark.sql.functions import lit

new_df = old_df.withColumn('new_column', lit(None).cast(StringType()))

一个完整的例子:

df = sc.parallelize([row(1, "2"), row(2, "3")]).toDF()
df.printSchema()

## root
##  |-- foo: long (nullable = true)
##  |-- bar: string (nullable = true)

new_df = df.withColumn('new_column', lit(None).cast(StringType()))
new_df.printSchema()

## root
##  |-- foo: long (nullable = true)
##  |-- bar: string (nullable = true)
##  |-- new_column: string (nullable = true)

new_df.show()

## +---+---+----------+
## |foo|bar|new_column|
## +---+---+----------+
## |  1|  2|      null|
## |  2|  3|      null|
## +---+---+----------+

可在此处找到等效的 Scala:Create new Dataframe with empty/null field values


如果该列首先不存在,如何有条件地执行此操作?我正在尝试使用 UDF 并将 DF 传递给它,然后进行 new_column not in df.columns 检查,但无法使其工作。
我也看过它,但我仍然无法有条件地将它合并到 withColumn('blah', where(has_column(df['blah']) == False).... 类型的构造中。必须缺少一些语法结构。如果不存在,我想添加一个带有 Null 的列。这个答案是前者,另一个检查后者。
@Gopala df if has_column(df) else df.withColumn(....) - 没有特定的 Spark。
该死的......我真的很困惑python语法什么时候可以工作,什么时候不可以。例如,您不能在 withColumn 中包含条件代码,而必须使用 UDF。谢谢!
S
Shrikant Prabhu

我会将 lit(None) 转换为 NullType 而不是 StringType。因此,如果我们必须过滤掉该列上的非空行......它可以很容易地完成如下

df = sc.parallelize([Row(1, "2"), Row(2, "3")]).toDF()

new_df = df.withColumn('new_column', lit(None).cast(NullType()))

new_df.printSchema() 

df_null = new_df.filter(col("new_column").isNull()).show()
df_non_null = new_df.filter(col("new_column").isNotNull()).show()

如果要转换为 StringType,请注意不要使用 lit("None")(with quotes),因为它会在 col("new_column") 上搜索具有过滤条件 .isNull() 的记录时失败。


错误:Parquet data source does not support null data type.;StringType() 工作。
Z
ZygD

没有 import StringType 的选项

df = df.withColumn('foo', F.lit(None).cast('string'))

完整示例:

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()

df = spark.range(1, 3).toDF('c')
df = df.withColumn('foo', F.lit(None).cast('string'))

df.printSchema()
#     root
#      |-- c: long (nullable = false)
#      |-- foo: string (nullable = true)

df.show()
#     +---+----+
#     |  c| foo|
#     +---+----+
#     |  1|null|
#     |  2|null|
#     +---+----+