假设我正在做类似的事情:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()
root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
year make model comment blank
2012 Tesla S No comment
1997 Ford E350 Go get one now th...
但我真的希望 year
作为 Int
(并且可能转换一些其他列)。
我能想到的最好的办法是
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]
这有点令人费解。
我来自 R,我习惯于写作,例如
df2 <- df %>%
mutate(year = year %>% as.integer,
make = make %>% toupper)
我可能会遗漏一些东西,因为在 Spark/Scala 中应该有更好的方法来做到这一点......
编辑:最新的最新版本
从 spark 2.x 开始,您应该在使用 Scala [1] 时改用 dataset api。在此处查看文档:
如果使用 python,即使更容易,我也会在此处留下链接,因为这是一个投票率很高的问题:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]
[1] https://spark.apache.org/docs/latest/sql-programming-guide.html:
在 Scala API 中,DataFrame 只是 Dataset[Row] 的类型别名。而在 Java API 中,用户需要使用 Dataset 来表示一个 DataFrame。
编辑:最新版本
从 spark 2.x 开始,您可以使用 .withColumn
。在此处查看文档:
最旧的答案
从 Spark 1.4 版开始,您可以在列上应用带有 DataType 的 cast 方法:
import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
.drop("year")
.withColumnRenamed("yearTmp", "year")
如果您使用的是 sql 表达式,您还可以执行以下操作:
val df2 = df.selectExpr("cast(year as int) year",
"make",
"model",
"comment",
"blank")
有关详细信息,请查看文档:http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame
[编辑:2016 年 3 月:感谢投票!虽然说真的,这不是最好的答案,但我认为 msemelman、Martin Senne 等人提出的基于 withColumn
、withColumnRenamed
和 cast
的解决方案更简单、更干净]。
我认为您的方法没问题,回想一下 Spark DataFrame
是行的(不可变的)RDD,所以我们从来没有真正替换列,只是每次创建新的 DataFrame
新架构。
假设您有一个具有以下架构的原始 df :
scala> df.printSchema
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- DayofMonth: string (nullable = true)
|-- DayOfWeek: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Distance: string (nullable = true)
|-- CRSDepTime: string (nullable = true)
并且一些 UDF 在一列或多列上定义:
import org.apache.spark.sql.functions._
val toInt = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt )
val days_since_nearest_holidays = udf(
(year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
)
更改列类型甚至从另一个构建新的 DataFrame 可以这样编写:
val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour", toHour(df("CRSDepTime")))
.withColumn("dayOfWeek", toInt(df("DayOfWeek")))
.withColumn("dayOfMonth", toInt(df("DayofMonth")))
.withColumn("month", toInt(df("Month")))
.withColumn("distance", toDouble(df("Distance")))
.withColumn("nearestHoliday", days_since_nearest_holidays(
df("Year"), df("Month"), df("DayofMonth"))
)
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth",
"month", "distance", "nearestHoliday")
产生:
scala> df.printSchema
root
|-- departureDelay: double (nullable = true)
|-- departureHour: integer (nullable = true)
|-- dayOfWeek: integer (nullable = true)
|-- dayOfMonth: integer (nullable = true)
|-- month: integer (nullable = true)
|-- distance: double (nullable = true)
|-- nearestHoliday: integer (nullable = true)
这非常接近您自己的解决方案。简单地说,将类型更改和其他转换保持为单独的 udf val
会使代码更具可读性和可重用性。
NULL
或格式错误的条目会导致整个作业崩溃。 效率不高,因为 UDF 对 Catalyst 不透明。使用 UDF 进行复杂的操作很好,但没有理由将它们用于基本类型转换。这就是我们有 cast
方法的原因(参见 an answer by Martin Senne)。使事情对 Catalyst 透明需要更多的工作,但基本安全只是将 Try
和 Option
投入工作的问题。
withColumn()
部分简化为遍历所有列的通用部分?
由于 cast
操作可用于 Spark Column
(我个人不赞成 @Svend
建议的 udf
),如何:
df.select( df("year").cast(IntegerType).as("year"), ... )
转换为请求的类型?作为一个简洁的副作用,在这个意义上不可转换/“可转换”的值将变为 null
。
如果您需要它作为辅助方法,请使用:
object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}
使用如下:
import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
首先,如果你想转换类型,那么这个:
import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))
使用相同的列名,该列将被替换为新列。您无需执行添加和删除步骤。
其次,关于 Scala vs R。这是与 RI 最相似的代码:
val df2 = df.select(
df.columns.map {
case year @ "year" => df(year).cast(IntegerType).as(year)
case make @ "make" => functions.upper(df(make)).as(make)
case other => df(other)
}: _*
)
虽然代码长度比 R 长一点。这与语言的冗长无关。在 R 中,mutate
是 R 数据帧的特殊函数,而在 Scala 中,由于其表达能力,您可以轻松地临时使用它。
总之,它避免了特定的解决方案,因为语言设计对您来说已经足够好了快速轻松地构建您自己的领域语言。
旁注:df.columns
出人意料地是 Array[String]
而不是 Array[Column]
,也许他们希望它看起来像 Python pandas 的数据框。
import org.apache.spark.sql.types._
,然后是 IntegerType
而不是 sql.types.IntegerType
。
您可以使用 selectExpr
使其更简洁:
df.selectExpr("cast(year as int) as year", "upper(make) as make",
"model", "comment", "blank")
用于将 DataFrame 的数据类型从 String 修改为 Integer 的 Java 代码
df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
它只会将现有的(字符串数据类型)转换为整数。
sql.types
中没有 DataTypes
!它是 DataType
。此外,可以简单地导入 IntegerType
并进行转换。
我认为这对我来说更具可读性。
import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))
这会将您的年份列转换为 IntegerType
,并创建任何临时列并删除这些列。如果要转换为任何其他数据类型,可以检查 org.apache.spark.sql.types
包中的类型。
要将年份从字符串转换为整数,您可以在 csv 阅读器中添加以下选项:“inferSchema”-> “真”,见DataBricks documentation
生成一个包含五个值的简单数据集并将 int
转换为 string
类型:
val df = spark.range(5).select( col("id").cast("string") )
因此,这仅在您在保存到 sqlserver 之类的 jdbc 驱动程序时遇到问题时才真正有效,但它对于您在语法和类型方面遇到的错误非常有帮助。
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
}
}
JdbcDialects.registerDialect(SQLServerDialect)
建议使用 cast 的答案,仅供参考,spark 1.4.1 中的 cast 方法已损坏。
例如,当转换为 bigint 时,字符串列的值为“8182175552014127960”的数据帧的值为“8182175552014128100”
df.show
+-------------------+
| a|
+-------------------+
|8182175552014127960|
+-------------------+
df.selectExpr("cast(a as bigint) a").show
+-------------------+
| a|
+-------------------+
|8182175552014128100|
+-------------------+
在发现这个错误之前,我们不得不面对很多问题,因为我们在生产中使用了 bigint 列。
df.select($"long_col".cast(IntegerType).as("int_col"))
您可以使用以下代码。
df.withColumn("year", df("year").cast(IntegerType))
这会将 year 列转换为 IntegerType
列。
使用 Spark Sql 2.4.0 你可以这样做:
spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")
此方法将删除旧列并创建具有相同值和新数据类型的新列。创建 DataFrame 时我的原始数据类型是:-
root
|-- id: integer (nullable = true)
|-- flag1: string (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag3: string (nullable = true)
在此之后,我运行以下代码来更改数据类型:-
df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)
在此之后,我的结果是: -
root
|-- id: integer (nullable = true)
|-- flag2: string (nullable = true)
|-- name: string (nullable = true)
|-- flag1: boolean (nullable = true)
|-- flag3: boolean (nullable = true)
这么多的答案,没有太多详尽的解释
以下语法在 Spark 2.4 中使用 Databricks Notebook
from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))
请注意,您必须指定您拥有的条目格式(在我的情况下为“MM-dd-yyyy”)并且导入是强制性的,因为 to_date 是 spark sql 函数
还尝试了这种语法,但得到了空值而不是正确的强制转换:
df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))
(请注意,我必须使用括号和引号使其语法正确) PS:我不得不承认这就像一个语法丛林,有很多可能的入口点方式,官方 API 参考缺乏适当的例子。
另一种解决方案如下:
1)保持“inferSchema”为假
2)在行上运行'Map'函数时,您可以读取'asString'(row.getString ...)
//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema","false")
.load(args[0]);
JavaRDD<Box> vertices = enginesDataSet
.select("BOX","BOX_CD")
.toJavaRDD()
.map(new Function<Row, Box>() {
@Override
public Box call(Row row) throws Exception {
return new Box((String)row.getString(0),(String)row.get(1));
}
});
为什么不按照 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast 中的说明进行操作
df.select(df.year.cast("int"),"make","model","comment","blank")
可以通过在 spark sql 中使用 cast 来更改列的数据类型。表名是表,它有两列只有column1和column2,column1的数据类型要改变。 ex-spark.sql("select cast(column1 as Double) column1NewName,column2 from table") 代替 double 写入您的数据类型。
另一种方式:
// Generate a simple dataset containing five values and convert int to string type
val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")
如果您必须重命名由其名称给出的数十个列,以下示例采用@dnlbrky 的方法并将其一次应用于多个列:
df.selectExpr(df.columns.map(cn => {
if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
else cn
}):_*)
未铸造的列保持不变。所有列都保持原来的顺序。
val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
//Schema to be applied to the table
val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)
val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()
如果您想在不指定单个列名的情况下将特定类型的多个列更改为另一个列
/* Get names of all columns that you want to change type.
In this example I want to change all columns of type Array to String*/
val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)
//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}
//display
updatedDataFrame.show(truncate = false)
不定期副业成功案例分享
df.withColumn("ctr", temp("ctr").cast(DecimalType(decimalPrecision, decimalScale)))
Spark 2.x
的 docs,df.withColumn(..)
可以根据colName
参数添加或替换列