我试图找出在 Spark 数据框列中获得最大值的最佳方法。
考虑以下示例:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
这创造了:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
我的目标是在 A 列中找到最大值(通过检查,这是 3.0)。使用 PySpark,我可以想到以下四种方法:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
以上每一个都给出了正确的答案,但在没有 Spark 分析工具的情况下,我无法判断哪个是最好的。
任何来自直觉或经验的想法,关于上述哪种方法在 Spark 运行时或资源使用方面最有效,或者是否有比上述方法更直接的方法?
df.select(max("A")).collect()[0].asDict()['max(A)']
呢?看起来等同于方法 2,但更紧凑,也比方法 3 更直观。
>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor| timestamp| uid| x| y|
+-----+--------------------+--------+----------+-----------+
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|
>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613
答案与方法3几乎相同。但似乎可以删除方法3中的“asDict()”
数据帧的特定列的最大值可以通过使用来实现 -
your_max_value = df.agg({"your-column": "max"}).collect()[0][0]
备注:Spark 旨在用于大数据 - 分布式计算。示例 DataFrame 的大小非常小,因此可以相对于小示例更改实际示例的顺序。
最慢:Method_1,因为 .describe("A")
计算最小值、最大值、平均值、标准差和计数(对整列进行 5 次计算)。
中:Method_4,因为,.rdd
(DF 到 RDD 的转换)减慢了这个过程。
更快:Method_3 ~ Method_2 ~ Method_5,因为逻辑非常相似,所以 Spark 的催化剂优化器遵循非常相似的逻辑,操作次数最少(获取特定列的最大值,收集单值数据帧;.asDict()
增加了一点额外的-时间比较 2、3 与 5)
import pandas as pd
import time
time_dict = {}
dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#-- For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)
tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)
tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)
tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)
tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)
tic5 = int(round(time.time() * 1000))
# Method 5: Use agg()
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)
print time_dict
以毫秒 (ms) 为单位的集群边缘节点上的结果:
小 DF(毫秒):{'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}
更大的 DF(毫秒):{'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}
另一种方法:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
根据我的数据,我得到了以下基准:
df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s
df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s
df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s
他们都给出了相同的答案
下面的示例显示了如何获取 Spark 数据框列中的最大值。
from pyspark.sql.functions import max
df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
| 3.0|
+------+
print result.collect()[0]['max(A)']
3.0
同样可以计算出min、mean等,如下所示:
from pyspark.sql.functions import mean, min, max
result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
| 2.0| 1.0| 3.0|
+------+------+------+
首先添加导入行:
from pyspark.sql.functions import min, max
要在数据框中找到年龄的最小值:
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
要在数据框中找到年龄的最大值:
df.agg(max("age")).show()
+--------+
|max(age)|
+--------+
| 77|
+--------+
我使用了这个链中已经存在的另一种解决方案(@satprem Rath)。
要在数据框中找到年龄的最小值:
df.agg(min("age")).show()
+--------+
|min(age)|
+--------+
| 29|
+--------+
编辑:添加更多上下文。
虽然上述方法打印了结果,但我在将结果分配给变量以供以后重用时遇到了问题。
因此,要仅获取分配给变量的 int
值:
from pyspark.sql.functions import max, min
maxValueA = df.agg(max("A")).collect()[0][0]
maxValueB = df.agg(max("B")).collect()[0][0]
如果有人想知道如何使用 Scala(使用 Spark 2.0.+)来做到这一点,请看这里:
scala> df.createOrReplaceTempView("TEMP_DF")
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF").
collect()(0).getInt(0)
scala> print(myMax)
117
我相信最好的解决方案是使用 head()
考虑到你的例子:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
使用 python 的 agg 和 max 方法,我们可以得到如下值:
from pyspark.sql.functions import max df.agg(max(df.A)).head()[0]
这将返回:3.0
确保您有正确的导入:
from pyspark.sql.functions import max
我们这里使用的 max 函数是 pySPark sql 库函数,而不是 python 默认的 max 函数。
from pyspark.sql.functions import max
我们在这里使用的最大值是 pySpark sql 函数而不是 python 最大值最好使用别名from pyspark.sql.functions import max as mx
要获得价值,请使用其中任何一个
df1.agg({"x": "max"}).collect()[0][0] df1.agg({"x": "max"}).head()[0] df1.agg({ "x": "max"}).first()[0]
或者我们可以为 'min' 做这些
from pyspark.sql.functions import min, max
df1.agg(min("id")).collect()[0][0]
df1.agg(min("id")).head()[0]
df1.agg(min("id")).first()[0]
在 pyspark 你可以这样做:
max(df.select('ColumnName').rdd.flatMap(lambda x: x).collect())
这是一种懒惰的方法,只需进行计算统计:
df.write.mode("overwrite").saveAsTable("sampleStats")
Query = "ANALYZE TABLE sampleStats COMPUTE STATISTICS FOR COLUMNS " + ','.join(df.columns)
spark.sql(Query)
df.describe('ColName')
或者
spark.sql("Select * from sampleStats").describe('ColName')
或者你可以打开一个蜂巢壳然后
describe formatted table sampleStats;
您将在属性中看到统计信息 - 最小值、最大值、不同值、空值等。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val testDataFrame = Seq(
(1.0, 4.0), (2.0, 5.0), (3.0, 6.0)
).toDF("A", "B")
val (maxA, maxB) = testDataFrame.select(max("A"), max("B"))
.as[(Double, Double)]
.first()
println(maxA, maxB)
结果是(3.0,6.0),和testDataFrame.agg(max($"A"), max($"B")).collect()(0)
一样。但是,testDataFrame.agg(max($"A"), max($"B")).collect()(0)
返回一个List,[3.0,6.0]
collect()
返回一个列表(在本例中为单个项目),因此您需要访问列表中的第一个(唯一)项目collect()[0]
,则可以使用 @Burthead()
。.collect()[0]
有效,但使用.first()[0]
可能更安全。根据定义,collect() 将“在驱动程序中将数据集的所有元素作为数组返回。”,这是一台机器。如果语法错误,最终可能会使用过多的内存。