I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.
I've tried the following without any success:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Also got an error using this:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?
You cannot add an arbitrary column to a DataFrame
in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
transforming an existing column:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
included using join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
or generated with function / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Performance-wise, built-in functions (pyspark.sql.functions
), which map to Catalyst expression, are usually preferred over Python user defined functions.
If you want to add content of an arbitrary RDD as a column you can
add row numbers to existing data frame
call zipWithIndex on RDD and convert it to data frame
join both using index as a join key
To add a column using a UDF:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
For Spark 2.0
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
df = df.select('*', (df.age + 10).alias('agePlusTen'))
you are effectively adding an arbitrary column as @zero323 warned us above was impossible, unless there's something wrong with doing this in Spark, in Pandas it's the standard way..
df.select('*', df.age + 10, df.age + 20)
There are multiple ways we can add a new column in pySpark.
Let's first create a simple DataFrame.
date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())
Now let's try to double the column value and store it in a new column. PFB few different approaches to achieve the same.
# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()
# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()
# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()
# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()
For more examples and explanation on spark DataFrame functions, you can visit my blog.
I hope this helps.
We can add additional columns to DataFrame directly with below steps:
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
To add new column with some custom value or dynamic value calculation which will be populated based on the existing columns.
e.g.
|ColumnA | ColumnB |
|--------|---------|
| 10 | 15 |
| 10 | 20 |
| 10 | 30 |
and new ColumnC as ColumnA+ColumnB
|ColumnA | ColumnB | ColumnC|
|--------|---------|--------|
| 10 | 15 | 25 |
| 10 | 20 | 30 |
| 10 | 30 | 40 |
using
#to add new column
def customColumnVal(row):
rd=row.asDict()
rd["ColumnC"]=row["ColumnA"] + row["ColumnB"]
new_row=Row(**rd)
return new_row
----------------------------
#convert DF to RDD
df_rdd= input_dataframe.rdd
#apply new fucntion to rdd
output_dataframe=df_rdd.map(customColumnVal).toDF()
input_dataframe
is the dataframe which will get modified and customColumnVal
function is having code to add new column.
You can define a new udf
when adding a column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))
StringType()
.
I would like to offer a generalized example for a very similar use case:
Use Case: I have a csv consisting of:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
I need to perform some transformations and the final csv needs to look like
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.
so:
1) I read the original csv using spark.read and call it "df".
2) I do something to the data.
3) I add the null columns using this script:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).
The simplest way to add a column is to use "withColumn". Since the dataframe is created using sqlContext, you have to specify the schema or by default can be available in the dataset. If the schema is specified, the workload becomes tedious when changing every time.
Below is an example that you can consider:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
Success story sharing