ChatGPT解决这个技术问题 Extra ChatGPT

pyspark collect_set 或 collect_list 与 groupby

如何在 groupby 之后在数据帧上使用 collect_setcollect_list。例如:df.groupby('key').collect_set('values')。我收到一个错误:AttributeError: 'GroupedData' object has no attribute 'collect_set'

您能否发布一些会引发此错误的示例数据,以便我们调试您的问题?

p
pault

您需要使用 agg。例子:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

注意在上面你必须创建一个 HiveContext。有关处理不同 Spark 版本的信息,请参阅 https://stackoverflow.com/a/35529093/690430

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

collect_set() 包含不同的元素,而 collect_list() 包含所有元素(空值除外)
collect_set 或 collect_list 上的 size 函数将更好地计算计数值或使用普通计数函数。我正在使用一个窗口来获取附加到帐户的交易计数。
当我在列表中有多个列时,如何将 collect_list 的输出作为字典,例如:agg(collect_list(struct(df.f1,df.f2,df.f3)))。每个组的输出应该是 [f1:value,f2:value,f3:value]。
在大型数据帧上执行此操作时,collect_set 似乎没有让我得到正确的组值。有什么想法吗?
A
Allen

如果您的数据框很大,您可以尝试使用 pandas udf(GROUPED_AGG) 来避免内存错误。它也快得多。

分组聚合 Pandas UDF 类似于 Spark 聚合函数。分组聚合 Pandas UDF 与 groupBy().agg() 和 pyspark.sql.Window 一起使用。它定义了从一个或多个 pandas.Series 到一个标量值的聚合,其中每个 pandas.Series 代表组或窗口中的一列。熊猫 udf

例子:

import pyspark.sql.functions as F

@F.pandas_udf('string', F.PandasUDFType.GROUPED_AGG)
def collect_list(name):
    return ', '.join(name)

grouped_df = df.groupby('id').agg(collect_list(df["name"]).alias('names'))

我不认为自定义 UDF 比内置 spark 快
我知道 pandas UDF 比内置 spark 慢得多(而且,pandas UDF 需要集群的更多内存)!什么更快,纯 java/scala 或 java 必须在数据结构上调用 python,该数据结构也必须通过箭头序列化为 pandas DF?