I am writing a User Defined Function which will take all the columns except the first one in a dataframe and do sum (or any other operation). Now the dataframe can sometimes have 3 columns or 4 columns or more. It will vary.
I know I can hard code 4 column names as pass in the UDF but in this case it will vary so I would like to know how to get it done?
Here are two examples in the first one we have two columns to add and in the second one we have three columns to add.
https://i.stack.imgur.com/ZVQ72.png
If all columns you want to pass to UDF have the same data type you can use array as input parameter, for example:
>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf, array
>>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
... .withColumn('Result', sum_cols(array('A', 'B'))).show()
+---+---+---+------+
| ID| A| B|Result|
+---+---+---+------+
|101| 1| 16| 17|
+---+---+---+------+
>>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
... .withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
+---+---+---+---+------+
| ID| A| B| C|Result|
+---+---+---+---+------+
|101| 1| 16| 8| 25|
+---+---+---+---+------+
Another simple way without Array and Struct.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def sum(x, y):
return x + y
sum_cols = udf(sum, IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()
Use struct instead of array
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
struct
instead of array
? I'm guessing that this is to handle columns of different types?
Maybe it's a late answer, but I don't like using UDFs without necessity, so:
from pyspark.sql.functions import col
from functools import reduce
data = [["a",1,2,5],["b",2,3,7],["c",3,4,8]]
df = spark.createDataFrame(data,["id","v1","v2",'v3'])
calculate = reduce(lambda a, x: a+x, map(col, ["v1","v2",'v3']))
df.withColumn("Result", calculate)
#
#id v1 v2 v3 Result
#a 1 2 5 8
#b 2 3 7 12
#c 3 4 8 15
Here u could to use any operation which implement in Column
. Also if u want to write a custom udf
with specific logic, u could use it, because Column
provide tree execution operations. Without collecting to array and sum on it.
If compared with process as array operations, it will be bad from performance perspective, let's take a look at the physical plan, in my case and array case, in my case and array
cased.
my case:
== Physical Plan ==
*(1) Project [id#355, v1#356L, v2#357L, v3#358L, ((v1#356L + v2#357L) + v3#358L) AS Result#363L]
+- *(1) Scan ExistingRDD[id#355,v1#356L,v2#357L,v3#358L]
array case:
== Physical Plan ==
*(2) Project [id#339, v1#340L, v2#341L, v3#342L, pythonUDF0#354 AS Result#348]
+- BatchEvalPython [<lambda>(array(v1#340L, v2#341L, v3#342L))], [pythonUDF0#354]
+- *(1) Scan ExistingRDD[id#339,v1#340L,v2#341L,v3#342L]
When possible - we need to avoid using UDFs as Catalyst does not know how to optimize those
If you don't want to type out all your column names and would rather just dump all the columns into your UDF, you'll need to wrap a list comprehension within a struct.
from pyspark.sql.functions import struct, udf
sum_udf = udf(lambda x: sum(x[1:]))
df_sum = df.withColumn("result", sum_udf(struct([df[col] for col in df.columns])))
This is the way I tried and seemed to work:
colsToSum = df.columns[1:]
df_sum = df.withColumn("rowSum", sum([df[col] for col in colsToSum]))
udf_ = spark.udf.register("udf_",self.funct)
print("registered udf................:",udf_)
df = df.withColumn('result',udf_(struct([df[col] for col in df.columns])))
print("after df call")
where self.funct is defined in another class and I am trying to register this function using spark.udf.register and call this function from df.withColumn and returning as result is not working ..
Output: registered udf................:
but in real this is not entering into the funct function of the function class.
function class as follows class function(): def init: def funct(self,df): print("inside funct function") return F.col(S)*F.col(S)
S column is part of df data frame and int
Success story sharing
myUdf(array($"col1",$"col2"))
array
if sum numbers of different types also (i.e. integer and double -> both will be casted to double)