ChatGPT解决这个技术问题 Extra ChatGPT

PySpark: withColumn() with two conditions and three outcomes

I am working with Spark and PySpark. I am trying to achieve the result equivalent to the following pseudocode:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)

I am trying to do this in PySpark but I'm not sure about the syntax. Any pointers? I looked into expr() but couldn't get it to work.

Note that df is a pyspark.sql.dataframe.DataFrame.


z
zero323

There are a few efficient ways to implement this. Let's start with required imports:

from pyspark.sql.functions import col, expr, when

You can use Hive IF function inside expr:

new_column_1 = expr(
    """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
)

or when + otherwise:

new_column_2 = when(
    col("fruit1").isNull() | col("fruit2").isNull(), 3
).when(col("fruit1") == col("fruit2"), 1).otherwise(0)

Finally you could use following trick:

from pyspark.sql.functions import coalesce, lit

new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))

With example data:

df = sc.parallelize([
    ("orange", "apple"), ("kiwi", None), (None, "banana"), 
    ("mango", "mango"), (None, None)
]).toDF(["fruit1", "fruit2"])

you can use this as follows:

(df
    .withColumn("new_column_1", new_column_1)
    .withColumn("new_column_2", new_column_2)
    .withColumn("new_column_3", new_column_3))

and the result is:

+------+------+------------+------------+------------+
|fruit1|fruit2|new_column_1|new_column_2|new_column_3|
+------+------+------------+------------+------------+
|orange| apple|           0|           0|           0|
|  kiwi|  null|           3|           3|           3|
|  null|banana|           3|           3|           3|
| mango| mango|           1|           1|           1|
|  null|  null|           3|           3|           3|
+------+------+------------+------------+------------+

In spark 2.2+, the function 'col' did not work for me. Using directly the column names without quotes worked. For example: new_column_1 = expr(" col_1 + int(col_2/15) ")
D
David

You'll want to use a udf as below

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def func(fruit1, fruit2):
    if fruit1 == None or fruit2 == None:
        return 3
    if fruit1 == fruit2:
        return 1
    return 0

func_udf = udf(func, IntegerType())
df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2']))

I got a couple errors from this solution, @David. First one was solved with from pyspark.sql.types import StringType. Second one is: TypeError: 'int' object is not callable, which I'm not sure how to resolve. Note that df is a pyspark.sql.dataframe.DataFrame.
@user2205916 I had a couple of typos. In the line def func(... I had fruit 1 (with a space) instead of fruit1. In the line starting func_udf =... I had StringType instead of IntegerType. Try it with the updated code and let me know if you still have issues
I get the same error message. Also, I think a paren is missing at the end of df = . . .
Ugh another typo, 2nd to last line should be func_udf = udf(func, IntegerType())
Have to run, but this is close (typos withstanding). If it still isn't working, make sure you don't have a situation like this stackoverflow.com/questions/9767391/…
N
Nidhi

The withColumn function in pyspark enables you to make a new variable with conditions, add in the when and otherwise functions and you have a properly working if then else structure.

For all of this you would need to import the sparksql functions, as you will see that the following bit of code will not work without the col() function.

In the first bit, we declare a new column -'new column', and then give the condition enclosed in when function (i.e. fruit1==fruit2) then give 1 if the condition is true, if untrue the control goes to the otherwise which then takes care of the second condition (fruit1 or fruit2 is Null) with the isNull() function and if true 3 is returned and if false, the otherwise is checked again giving 0 as the answer.

from pyspark.sql import functions as F

df=df.withColumn('new_column', 
    F.when(F.col('fruit1')==F.col('fruit2'), 1)
    .otherwise(F.when((F.col('fruit1').isNull()) | (F.col('fruit2').isNull()), 3))
    .otherwise(0))

can u please explain your code ? so that someone new can understand what u have done
@Nidhi, can something similar be perform if fruit1 and fruit2 are from different dataframes?

关注公众号,不定期副业成功案例分享
Follow WeChat

Success story sharing

Want to stay one step ahead of the latest teleworks?

Subscribe Now