ChatGPT解决这个技术问题 Extra ChatGPT

Pyspark: Pass multiple columns in UDF

I am writing a User Defined Function which will take all the columns except the first one in a dataframe and do sum (or any other operation). Now the dataframe can sometimes have 3 columns or 4 columns or more. It will vary.

I know I can hard code 4 column names as pass in the UDF but in this case it will vary so I would like to know how to get it done?

Here are two examples in the first one we have two columns to add and in the second one we have three columns to add.

https://i.stack.imgur.com/ZVQ72.png


M
Mariusz

If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example:

>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf, array
>>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
...     .withColumn('Result', sum_cols(array('A', 'B'))).show()
+---+---+---+------+
| ID|  A|  B|Result|
+---+---+---+------+
|101|  1| 16|    17|
+---+---+---+------+

>>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
...     .withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
+---+---+---+---+------+
| ID|  A|  B|  C|Result|
+---+---+---+---+------+
|101|  1| 16|  8|    25|
+---+---+---+---+------+

Also works in Scala: myUdf(array($"col1",$"col2"))
how it can be implemented for columns with different types?
@constructor you can use array if sum numbers of different types also (i.e. integer and double -> both will be casted to double)
V
Vzzarr

Another simple way without Array and Struct.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def sum(x, y):
    return x + y

sum_cols = udf(sum, IntegerType())

a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()

I think you don't need to import struct.
k
kavi

Use struct instead of array

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()

Can you explain why one would use struct instead of array? I'm guessing that this is to handle columns of different types?
Р
Роберт Надь

Maybe it's a late answer, but I don't like using UDFs without necessity, so:

from pyspark.sql.functions import col
from functools import reduce
data = [["a",1,2,5],["b",2,3,7],["c",3,4,8]]
df = spark.createDataFrame(data,["id","v1","v2",'v3'])

calculate = reduce(lambda a, x: a+x, map(col, ["v1","v2",'v3']))

df.withColumn("Result", calculate)
#
#id v1  v2  v3  Result
#a  1   2   5   8
#b  2   3   7   12
#c  3   4   8   15

Here u could to use any operation which implement in Column. Also if u want to write a custom udf with specific logic, u could use it, because Column provide tree execution operations. Without collecting to array and sum on it.

If compared with process as array operations, it will be bad from performance perspective, let's take a look at the physical plan, in my case and array case, in my case and array cased.

my case:

== Physical Plan ==
*(1) Project [id#355, v1#356L, v2#357L, v3#358L, ((v1#356L + v2#357L) + v3#358L) AS Result#363L]
+- *(1) Scan ExistingRDD[id#355,v1#356L,v2#357L,v3#358L]

array case:

== Physical Plan ==
*(2) Project [id#339, v1#340L, v2#341L, v3#342L, pythonUDF0#354 AS Result#348]
+- BatchEvalPython [<lambda>(array(v1#340L, v2#341L, v3#342L))], [pythonUDF0#354]
   +- *(1) Scan ExistingRDD[id#339,v1#340L,v2#341L,v3#342L]

When possible - we need to avoid using UDFs as Catalyst does not know how to optimize those


m
mdrishan

If you don't want to type out all your column names and would rather just dump all the columns into your UDF, you'll need to wrap a list comprehension within a struct.

from pyspark.sql.functions import struct, udf
sum_udf = udf(lambda x: sum(x[1:]))
df_sum = df.withColumn("result", sum_udf(struct([df[col] for col in df.columns])))

T
Travis Hensrud

This is the way I tried and seemed to work:

colsToSum = df.columns[1:]
df_sum = df.withColumn("rowSum", sum([df[col] for col in colsToSum]))

This only works because you're not calling a UDF. Otherwise, you'll need to wrap this list comprehension within a struct.
D
Deb
            udf_ = spark.udf.register("udf_",self.funct)
            print("registered udf................:",udf_)
            df = df.withColumn('result',udf_(struct([df[col] for col in df.columns]))) 
            print("after df call")

where self.funct is defined in another class and I am trying to register this function using spark.udf.register and call this function from df.withColumn and returning as result is not working ..

Output: registered udf................: after df call

but in real this is not entering into the funct function of the function class.

function class as follows class function(): def init: def funct(self,df): print("inside funct function") return F.col(S)*F.col(S)

S column is part of df data frame and int