ChatGPT解决这个技术问题 Extra ChatGPT

Add an empty column to Spark DataFrame

As mentioned in many other locations on the web, adding a new column to an existing DataFrame is not straightforward. Unfortunately it is important to have this functionality (even though it is inefficient in a distributed environment) especially when trying to concatenate two DataFrames using unionAll.

What is the most elegant workaround for adding a null column to a DataFrame to facilitate a unionAll?

My version goes like this:

from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))

C
Community

All you need here is a literal and cast:

from pyspark.sql.functions import lit

new_df = old_df.withColumn('new_column', lit(None).cast(StringType()))

A full example:

df = sc.parallelize([row(1, "2"), row(2, "3")]).toDF()
df.printSchema()

## root
##  |-- foo: long (nullable = true)
##  |-- bar: string (nullable = true)

new_df = df.withColumn('new_column', lit(None).cast(StringType()))
new_df.printSchema()

## root
##  |-- foo: long (nullable = true)
##  |-- bar: string (nullable = true)
##  |-- new_column: string (nullable = true)

new_df.show()

## +---+---+----------+
## |foo|bar|new_column|
## +---+---+----------+
## |  1|  2|      null|
## |  2|  3|      null|
## +---+---+----------+

A Scala equivalent can be found here: Create new Dataframe with empty/null field values


How to do this conditionally, if the column does not exist in the first place? I am trying to use UDF and pass the DF to it and then doing new_column not in df.columns check, but can't make it to work.
I looked at it too, but I was still not able to incorporate that conditionally into a withColumn('blah', where(has_column(df['blah']) == False).... kind of construct. Must be missing some syntactical construct. I want to add a column with Nulls, if it does not exist. This answer does former, the other one checks latter.
@Gopala df if has_column(df) else df.withColumn(....) - nothing Spark specific.
Darn....I get really mixed up with when python syntax works, and when it does not. For example, you can't have conditional code within the withColumn and have to use UDFs. Thank you!
S
Shrikant Prabhu

I would cast lit(None) to NullType instead of StringType. So that if we ever have to filter out not null rows on that column...it can be easily done as follows

df = sc.parallelize([Row(1, "2"), Row(2, "3")]).toDF()

new_df = df.withColumn('new_column', lit(None).cast(NullType()))

new_df.printSchema() 

df_null = new_df.filter(col("new_column").isNull()).show()
df_non_null = new_df.filter(col("new_column").isNotNull()).show()

Also be careful about not using lit("None")(with quotes) if you are casting to StringType since it would fail for searching for records with filter condition .isNull() on col("new_column").


Error: Parquet data source does not support null data type.;. StringType() worked.
Z
ZygD

The option without import StringType

df = df.withColumn('foo', F.lit(None).cast('string'))

Full example:

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()

df = spark.range(1, 3).toDF('c')
df = df.withColumn('foo', F.lit(None).cast('string'))

df.printSchema()
#     root
#      |-- c: long (nullable = false)
#      |-- foo: string (nullable = true)

df.show()
#     +---+----+
#     |  c| foo|
#     +---+----+
#     |  1|null|
#     |  2|null|
#     +---+----+