ChatGPT解决这个技术问题 Extra ChatGPT

pyspark collect_set or collect_list with groupby

How can I use collect_set or collect_list on a dataframe after groupby. for example: df.groupby('key').collect_set('values'). I get an error: AttributeError: 'GroupedData' object has no attribute 'collect_set'

Can you post some sample data that will throw this error so that we can debug your issue?

p
pault

You need to use agg. Example:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

Note in the above you have to create a HiveContext. See https://stackoverflow.com/a/35529093/690430 for dealing with different Spark versions.

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

collect_set() contains distinct elements and collect_list() contains all elements (except nulls)
size function on collect_set or collect_list will be better to calculate the count value or to use plain count function . I am using an window to get the count of transaction attached to an account.
How to have the output of collect_list as dict when i have multiple columns inside list eg : agg(collect_list(struct(df.f1,df.f2,df.f3))). Output should be [f1:value,f2:value,f3:value] for each group.
While performing this on large dataframe, collect_set does not seem to get me correct values of a group. Any thoughts?
A
Allen

If your dataframe is large, you can try using pandas udf(GROUPED_AGG) to avoid memory error. It is also much faster.

Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with groupBy().agg() and pyspark.sql.Window. It defines an aggregation from one or more pandas.Series to a scalar value, where each pandas.Series represents a column within the group or window. pandas udf

example:

import pyspark.sql.functions as F

@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
    return ', '.join(name)

grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))

I do not think a custom UDF is faster than a spark builtin
I know that a pandas UDF is way slower than a spark builtin (and also, that a pandas UDF requires more memory from your cluster)! What's faster, pure java/scala, or java that has to call python on a data structure that also has to be serialized via arrow into a pandas DF?