ChatGPT解决这个技术问题 Extra ChatGPT

将 spark DataFrame 列转换为 python 列表

我处理具有两列 mvv 和 count 的数据框。

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

我想获得两个包含 mvv 值和计数值的列表。就像是

mvv = [1,2,3,4]
count = [5,9,3,1]

所以,我尝试了以下代码: 第一行应该返回一个 python 行列表。我想看到第一个值:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

但我收到第二行的错误消息:

属性错误:getInt

从 Spark 2.3 开始,此代码是最快且最不可能导致 OutOfMemory 异常的代码:list(df.select('mvv').toPandas()['mvv'])Arrow was integrated into PySpark 显着加快了 toPandas。如果您使用的是 Spark 2.3+,请不要使用其他方法。有关更多基准测试详细信息,请参阅我的答案。

F
Francesco Boi

看,为什么你正在做的这种方式不起作用。首先,您尝试从 Row 类型中获取整数,您收集的输出如下所示:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

如果你采取这样的做法:

>>> firstvalue = mvv_list[0].mvv
Out: 1

您将获得 mvv 值。如果你想要数组的所有信息,你可以采取如下方式:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

但是,如果您对另一列尝试相同的操作,您会得到:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

发生这种情况是因为 count 是一个内置方法。该列与 count 同名。解决方法是将 count 的列名更改为 _count

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

但是不需要这种解决方法,因为您可以使用字典语法访问该列:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

它最终会奏效!


它适用于第一列,但它不适用于我认为的列数(火花的函数计数)
您可以添加您对计数的操作吗?在评论中添加这里。
感谢您的回复所以这条线工作 mvv_list = [int(i.mvv) for i in mvv_count.select('mvv').collect()] 但不是这个 count_list = [int(i.count) for i in mvv_count .select('count').collect()] 返回无效语法
不需要像这样添加此 select('count') 用法:count_list = [int(i.count) for i in mvv_list.collect()] 我将示例添加到响应中。
@a.moussa [i.['count'] for i in mvv_list.collect()] 明确使用名为“count”的列而不是 count 函数
N
Neo

跟随一个班轮给出你想要的列表。

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()

性能方面,此解决方案比您的解决方案 mvv_list = [int(i.mvv) for i in mvv_count.select('mvv').collect()] 快得多
这不只适用于 OP 的问题吗?: mvv = mvv_count_df.select("mvv").rdd.flatMap(list).collect()
M
Muhammad Raihan Muhaimin

这将为您提供所有元素作为列表。

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)

这是 Spark 2.3+ 的最快和最有效的解决方案。请参阅我的答案中的基准测试结果。
P
Powers

我进行了基准分析,list(mvv_count_df.select('mvv').toPandas()['mvv']) 是最快的方法。我很惊讶。

我使用带有 Spark 2.4.5 的 5 节点 i3.xlarge 集群(每个节点有 30.5 GB 的 RAM 和 4 个内核)在 10 万 / 1 亿行数据集上运行不同的方法。数据均匀分布在 20 个快速压缩的 Parquet 文件中,并具有单列。

这是基准测试结果(以秒为单位的运行时间):

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

在驱动节点上收集数据时要遵循的黄金法则:

尝试用其他方法解决问题。将数据收集到驱动程序节点的成本很高,不能利用 Spark 集群的强大功能,因此应尽可能避免。

收集尽可能少的行。在收集数据之前聚合、去重、过滤和修剪列。尽可能少地向驱动节点发送数据。

toPandas was significantly improved in Spark 2.3。如果您使用的是早于 2.3 的 Spark 版本,这可能不是最好的方法。

有关更多详细信息/基准测试结果,请参阅 here


这确实令人惊讶,因为我认为 toPandas 会执行最差的操作之一,因为我们正在进行额外的数据结构转换。 Spark 团队在优化方面一定做得很好。感谢基准!
您还可以测试@phgui 的答案吗?它看起来也很有效。 mvv_list = df.select(collect_list("mvv")).collect()[0][0]
I
Itachi

下面的代码会帮助你

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()

这应该是公认的答案。原因是您在整个过程中都处于火花上下文中,然后在最后进行收集,而不是更早地退出火花上下文,这可能会导致更大的收集,具体取决于您在做什么。
l
luminousmen

根据我的数据,我得到了这些基准:

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0.52 秒

>>> [row[col] for row in data.collect()]

0.271 秒

>>> list(data.select(col).toPandas()[col])

0.427 秒

结果是一样的


如果您使用 toLocalIterator 而不是 collect,它甚至应该更节省内存[row[col] for row in data.toLocalIterator()]
L
LaSul

如果您收到以下错误:

AttributeError:“列表”对象没有属性“收集”

此代码将解决您的问题:

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]

我也遇到了这个错误,这个解决方案解决了这个问题。但是为什么我会收到错误消息? (许多其他人似乎不明白这一点!)
p
phgui

一种可能的解决方案是使用 pyspark.sql.functions 中的 collect_list() 函数。这会将所有列值聚合到一个 pyspark 数组中,该数组在收集时会转换为 python 列表:

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 

e
eemilk

让我们创建有问题的数据框

df_test = spark.createDataFrame(
    [
        (1, 5),
        (2, 9),
        (3, 3),
        (4, 1),
    ],
    ['mvv', 'count']
)
df_test.show()

这使

+---+-----+
|mvv|count|
+---+-----+
|  1|    5|
|  2|    9|
|  3|    3|
|  4|    1|
+---+-----+

然后应用 rdd.flatMap(f).collect() 获取列表

test_list = df_test.select("mvv").rdd.flatMap(list).collect()
print(type(test_list))
print(test_list)

这使

<type 'list'>
[1, 2, 3, 4]

S
Strick

您可以首先收集 df 并返回 Row 类型的列表

row_list = df.select('mvv').collect()

遍历行以转换为列表

sno_id_array = [ int(row.mvv) for row in row_list]

sno_id_array 
[1,2,3,4]

使用平面图

sno_id_array = df.select("mvv").rdd.flatMap(lambda x: x).collect()

a
ashkan

尽管有很多答案,但当您需要将列表与 whenisin 命令结合使用时,其中一些将不起作用。生成平面值列表的最简单但有效的方法是使用列表推导和 [0] 来避免行名称:

flatten_list_from_spark_df=[i[0] for i in df.select("your column").collect()]

另一种方法是使用 panda 数据框,然后使用 list 函数,但它不像 this.a 那样方便和有效


这是最好的答案。 RDD 已经过时并且难以使用。