ChatGPT解决这个技术问题 Extra ChatGPT

Updating a dataframe column in spark

Looking at the new spark DataFrame API, it is unclear whether it is possible to modify dataframe columns.

How would I go about changing a value in row x column y of a dataframe?

In pandas this would be:

df.ix[x,y] = new_value

Edit: Consolidating what was said below, you can't modify the existing dataframe as it is immutable, but you can return a new dataframe with the desired modifications.

If you just want to replace a value in a column based on a condition, like np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

If you want to perform some operation on a column and create a new column that is added to the dataframe:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

If you want the new column to have the same name as the old column, you could add the additional step:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
if you want to access the DataFrame by index, you need to build an index first. See, e.g. stackoverflow.com/questions/26828815/…. Or add an index column with your own index.

k
karlson

While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a UserDefinedFunction implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_df now has the same schema as old_df (assuming that old_df.target_column was of type StringType as well) but all values in column target_column will be new_value.


this is an actual answer to the problem thanks! yet, the spark jobs don't finish for me, all executors get los. can you think of an alternative way? I use it with a bit more complex UDF where I do transformation to strings. There is no pandas-similar Syntax like new_df = old_df.col1.apply(lambda x: func(x))?
there is also: new_df = old_df.withColumn('target_column', udf(df.name))
Yes, that should work fine. Keep in mind that UDFs can only take columns as parameters. If you want to pass other data into the function you have to partially apply it first.
@KatyaHandler If you just want to duplicate a column, one way to do so would be to simply select it twice: df.select([df[col], df[col].alias('same_column')]), where col is the name of the column you want to duplicate. With the latest Spark release, a lot of the stuff I've used UDFs for can be done with the functions defined in pyspark.sql.functions. UDF performance in Pyspark is really poor, so that might really be worth looking into: spark.apache.org/docs/latest/api/python/…
it is StringType not Stringtype in udf = UserDefinedFunction(lambda x: 'new_value', Stringtype())
P
Paul

Commonly when updating a column, we want to map an old value to a new value. Here's a way to do that in pyspark without UDF's:

# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
    F.when(df[update_col]==old_value,new_value).
    otherwise(df[update_col])).

How to use this, when my update_col is a list Ex-=: update_cols=['col1','col2','col3'] ?
Use a for loop.
J
Jacek Laskowski

DataFrames are based on RDDs. RDDs are immutable structures and do not allow updating elements on-site. To change values, you will need to create a new DataFrame by transforming the original one either using the SQL-like DSL or RDD operations like map.

A highly recommended slide deck: Introducing DataFrames in Spark for Large Scale Data Science.


What exactly is the dataframe abstraction adding then that couldn't already be done in the same amount of lines with a table?
" DataFrames introduce new simplified operators for filtering, aggregating, and projecting over large datasets. Internally, DataFrames leverage the Spark SQL logical optimizer to intelligently plan the physical execution of operations to work well on large datasets" - databricks.com/blog/2015/03/13/announcing-spark-1-3.html
C
Community

Just as maasg says you can create a new DataFrame from the result of a map applied to the old DataFrame. An example for a given DataFrame df with two rows:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

Note that if the types of the columns change, you need to give it a correct schema instead of df.schema. Check out the api of org.apache.spark.sql.Row for available methods: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Update] Or using UDFs in Scala:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

and if the column name needs to stay the same you can rename it back:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")

D
DHEERAJ

importing col, when from pyspark.sql.functions and updating fifth column to integer(0,1,2) based on the string(string a, string b, string c) into a new DataFrame.

from pyspark.sql.functions import col, when 

data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))