ChatGPT解决这个技术问题 Extra ChatGPT

How to loop through each row of dataFrame in pyspark

E.g

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

The above statement prints theentire table on terminal. But I want to access each row in that table using for or while to perform further calculations.

I believe I provided a correct answer. Can you select, or provide feedback to improve?

2
2 revs, 2 users 70%

You simply cannot. DataFrames, same as other distributed data structures, are not iterable and can be accessed using only dedicated higher order function and / or SQL methods.

You can of course collect

for row in df.rdd.collect():
    do_something(row)

or convert toLocalIterator

for row in df.rdd.toLocalIterator():
    do_something(row)

and iterate locally as shown above, but it beats all purpose of using Spark.


Newbie question: As iterating an already collected dataframe "beats the purpose", from a dataframe, how should I pick the rows I need for further processing?
Did some reading and looks like forming a new dataframe with where() would be the Spark-way of doing it properly.
"it beats all purpose of using Spark" is pretty strong and subjective language. The collect() method exists for a reason, and there are many valid uses cases for it. Once Spark is done processing the data, iterating through the final results might be the only way to integrate with/write to external APIs or legacy systems.
D
David

To "loop" and take advantage of Spark's parallel computation framework, you could define a custom function and use map.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)

or

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

The custom function would then be applied to every row of the dataframe. Note that sample2 will be a RDD, not a dataframe.

Map may be needed if you are going to perform more complex computations. If you just need to add a simple derived column, you can use the withColumn, with returns a dataframe.

sample3 = sample.withColumn('age2', sample.age + 2)

a
aaronsteers

Using list comprehensions in python, you can collect an entire column of values into a list using just two lines:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

In the above example, we return a list of tables in database 'default', but the same can be adapted by replacing the query used in sql().

Or more abbreviated:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

And for your example of three columns, we can create a list of dictionaries, and then iterate through them in a for loop.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))

D
Dustin Sun

Give A Try Like this

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)

T
TobiSH

It might not be the best practice, but you can simply target a specific column using collect(), export it as a list of Rows, and loop through the list.

Assume this is your df:

+----------+----------+-------------------+-----------+-----------+------------------+ 
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+ 
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            | 
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+

to loop through rows in Date column:

rows = df3.select('Date').collect()

final_list = []
for i in rows:
    final_list.append(i[0])

print(final_list)

K
Katya Willard

If you want to do something to each row in a DataFrame object, use map. This will allow you to perform further calculations on each row. It's the equivalent of looping across the entire dataset from 0 to len(dataset)-1.

Note that this will return a PipelinedRDD, not a DataFrame.


K
Keyur Potdar

above

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

should be

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

for name, age, and city are not variables but simply keys of the dictionary.


Is a square bracket missing from right hand side of code line 2?
When you're not addressing the original question, don't post it as an answer but rather prefer commenting or suggest edit to the partially correct answer.