ChatGPT解决这个技术问题 Extra ChatGPT

PySpark groupBy 中的中位数/分位数

我想计算 Spark 数据帧上的组分位数(使用 PySpark)。近似或精确的结果都可以。我更喜欢可以在 groupBy / agg 的上下文中使用的解决方案,这样我就可以将它与其他 PySpark 聚合函数混合使用。如果由于某种原因这是不可能的,那么另一种方法也可以。

This question 是相关的,但没有说明如何将 approxQuantile 用作聚合函数。

我还可以访问 percentile_approx Hive UDF,但我不知道如何将它用作聚合函数。

为了具体起见,假设我有以下数据框:

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()    

df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()

预期结果是:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+
我认为您可能可以在这种情况下使用底层 rdd 和用于计算分布式分位数的算法(例如 here 和其中的链接)推出自己的。事实上,他们链接到的 github 上有一些 pyspark 示例。

C
Chris A.

我想你不再需要它了。但是会留在这里留给后代(即下周我忘记的时候)。

from pyspark.sql import Window
import pyspark.sql.functions as F

grp_window = Window.partitionBy('grp')
magic_percentile = F.expr('percentile_approx(val, 0.5)')

df.withColumn('med_val', magic_percentile.over(grp_window))

或者为了准确解决您的问题,这也有效:

df.groupBy('grp').agg(magic_percentile.alias('med_val'))

作为奖励,您可以传递一组百分位数:

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

作为回报,你会得到一份清单。


很干净的答案。你知道如何使用 Pandas UDF(又名 Vectorized UDF)来完成吗?
@CesareIurlaro,我只将它包装在 UDF 中。从未尝试过熊猫
你介意试试吗?性能确实应该在那里大放异彩:databricks.com/blog/2017/10/30/…
使用 Spark 3.1.0,现在可以在 PySpark groupby 聚合中直接使用 percentile_approxdf.groupBy("key").agg(percentile_approx("value", 0.5, lit(1000000)).alias("median")) spark.apache.org/docs/latest/api/python/reference/api/…
@thentangler:前者是精确的百分位数,对于大型数据集不是可扩展的操作,后者是近似的但可扩展的。
d
desertnaut

由于您可以访问 percentile_approx,一个简单的解决方案是在 SQL 命令中使用它:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df.registerTempTable("df")
df2 = sqlContext.sql("select grp, percentile_approx(val, 0.5) as med_val from df group by grp")

这可行,但我更喜欢可以在 PySpark 级别的 groupBy / agg 中使用的解决方案(以便我可以轻松地将它与其他 PySpark 聚合函数混合使用)。
@abeboparebop 我不相信只能使用 groupByagg,但是,使用基于窗口的方法也应该有效。
我已经在问题中阐明了我的理想解决方案。显然,这个答案可以完成工作,但这并不是我想要的。我会把这个问题留一段时间,看看是否会出现更清晰的答案。
d
desertnaut

(更新:现在有可能,请参阅上面接受的答案)

不幸的是,据我所知,使用“纯”PySpark 命令似乎不可能做到这一点(Shaido 的解决方案提供了 SQL 的解决方法),原因非常基本:与其他聚合相比函数,例如 meanapproxQuantile 不返回 Column 类型,而是返回一个 list

让我们看一个示例数据的快速示例:

spark.version
# u'2.2.0'

import pyspark.sql.functions as func
from pyspark.sql import DataFrameStatFunctions as statFunc

# aggregate with mean works OK:
df_grp_mean = df.groupBy('grp').agg(func.mean(df['val']).alias('mean_val'))
df_grp_mean.show()
# +---+--------+ 
# |grp|mean_val|
# +---+--------+
# |  B|     5.0|
# |  A|     2.0|
# +---+--------+

# try aggregating by median:
df_grp_med = df.groupBy('grp').agg(statFunc(df).approxQuantile('val', [0.5], 0.1))
# AssertionError: all exprs should be Column

# mean aggregation is a Column, but median is a list:

type(func.mean(df['val']))
# pyspark.sql.column.Column

type(statFunc(df).approxQuantile('val', [0.5], 0.1))
# list

我怀疑基于窗口的方法会有所作为,因为正如我所说的那样,根本原因是非常基本的。

另请参阅 my answer here 了解更多详细信息。


K
Kxrr

使用 pyspark==2.4.5 执行此操作的最简单方法是:

df \
    .groupby('grp') \
    .agg(expr('percentile(val, array(0.5))')[0].alias('50%')) \
    .show()

输出:

|grp|50%|
+---+---+
|  B|5.0|
|  A|2.0|
+---+---+

J
Jan_ewazz

pyspark >= 3.1.0 使用 percentile_approx 似乎完全解决了它

import pyspark.sql.functions as func    

df.groupBy("grp").agg(func.percentile_approx("val", 0.5).alias("median"))

有关详细信息,请参阅:https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.percentile_approx.html


e
eid

“percentile_approx(val, 0.5)”的问题:如果例如范围是 [1,2,3,4],则此函数返回 2(作为中值)以下函数返回 2.5:

import statistics

median_udf = F.udf(lambda x: statistics.median(x) if bool(x) else None, DoubleType())

... .groupBy('something').agg(median_udf(F.collect_list(F.col('value'))).alias('median'))