ChatGPT解决这个技术问题 Extra ChatGPT

如何更改 Spark SQL 的 DataFrame 中的列类型?

假设我正在做类似的事情:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment
1997 Ford  E350  Go get one now th...

但我真的希望 year 作为 Int(并且可能转换一些其他列)。

我能想到的最好的办法是

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

这有点令人费解。

我来自 R,我习惯于写作,例如

df2 <- df %>%
   mutate(year = year %>% as.integer,
          make = make %>% toupper)

我可能会遗漏一些东西,因为在 Spark/Scala 中应该有更好的方法来做到这一点......

我喜欢这种方式 spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")

m
msemelman

编辑:最新的最新版本

从 spark 2.x 开始,您应该在使用 Scala [1] 时改用 dataset api。在此处查看文档:

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

如果使用 python,即使更容易,我也会在此处留下链接,因为这是一个投票率很高的问题:

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]

[1] https://spark.apache.org/docs/latest/sql-programming-guide.html

在 Scala API 中,DataFrame 只是 Dataset[Row] 的类型别名。而在 Java API 中,用户需要使用 Dataset 来表示一个 DataFrame。

编辑:最新版本

从 spark 2.x 开始,您可以使用 .withColumn。在此处查看文档:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column):org.apache.spark.sql.DataFrame

最旧的答案

从 Spark 1.4 版开始,您可以在列上应用带有 DataType 的 cast 方法:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")

如果您使用的是 sql 表达式,您还可以执行以下操作:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")

有关详细信息,请查看文档:http://spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame


为什么你先用 withColumn 再用 drop?将 withColumn 与原始列名一起使用不是更容易吗?
无需删除列,然后重命名。您可以在一行中完成df.withColumn("ctr", temp("ctr").cast(DecimalType(decimalPrecision, decimalScale)))
在这种情况下,是否创建一个全新的数据框副本只是为了重铸一列?我错过了什么吗?或者也许在幕后有一些优化?
@user1814008 也许你想检查 stackoverflow.com/questions/30691385/internal-work-of-spark/… 。在那里,您可以找到有关 spark 转换和操作如何工作以及为什么应用转换不一定会创建新数据帧的深入解释。
通过 Spark 2.xdocsdf.withColumn(..) 可以根据 colName 参数添加或替换
S
Svend

[编辑:2016 年 3 月:感谢投票!虽然说真的,这不是最好的答案,但我认为 msemelman、Martin Senne 等人提出的基于 withColumnwithColumnRenamedcast 的解决方案更简单、更干净]。

我认为您的方法没问题,回想一下 Spark DataFrame 是行的(不可变的)RDD,所以我们从来没有真正替换列,只是每次创建新的 DataFrame新架构。

假设您有一个具有以下架构的原始 df :

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

并且一些 UDF 在一列或多列上定义:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

更改列类型甚至从另一个构建新的 DataFrame 可以这样编写:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

产生:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

这非常接近您自己的解决方案。简单地说,将类型更改和其他转换保持为单独的 udf val 会使代码更具可读性和可重用性。


这既不安全也不高效。 不安全,因为单个 NULL 或格式错误的条目会导致整个作业崩溃。 效率不高,因为 UDF 对 Catalyst 不透明。使用 UDF 进行复杂的操作很好,但没有理由将它们用于基本类型转换。这就是我们有 cast 方法的原因(参见 an answer by Martin Senne)。使事情对 Catalyst 透明需要更多的工作,但基本安全只是将 TryOption 投入工作的问题。
我没有看到任何与将字符串转换为日期相关的内容,例如“05-APR-2015”
有没有办法将您的 withColumn() 部分简化为遍历所有列的通用部分?
感谢 zero323,在阅读本文后,我想通了为什么这里的 udf 解决方案会崩溃。一些评论比一些关于 SO 的答案更好:)
有什么方法可以让我们了解损坏的行,这意味着记录在转换期间具有错误数据类型的列。由于 cast 函数使这些字段为空
d
danday74

由于 cast 操作可用于 Spark Column(我个人不赞成 @Svend 建议的 udf),如何:

df.select( df("year").cast(IntegerType).as("year"), ... )

转换为请求的类型?作为一个简洁的副作用,在这个意义上不可转换/“可转换”的值将变为 null

如果您需要它作为辅助方法,请使用:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

使用如下:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )

如果我需要转换和重命名一大堆列(我有 50 列,并且对 scala 来说相当新,不确定在不产生大量重复的情况下处理它的最佳方法是什么),您能否建议我如何继续?有些列应该保留字符串,有些应该转换为浮点数。
如何将字符串转换为日期,例如列中的“25-APR-2016”和“20160302”
@DmitrySmirnov 你有没有得到答案?我也有同样的问题。 ;)
@EvanZamir 不幸的是,我最终做了很多操作,以便能够在其他步骤中将数据用作 rdd。我想知道这些天这是否变得更容易了:)
W
WeiChing 林煒清

首先,如果你想转换类型,那么这个:

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))

使用相同的列名,该列将被替换为新列。您无需执行添加和删除步骤。

其次,关于 Scala vs R。这是与 RI 最相似的代码:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)

虽然代码长度比 R 长一点。这与语言的冗长无关。在 R 中,mutate 是 R 数据帧的特殊函数,而在 Scala 中,由于其表达能力,您可以轻松地临时使用它。
总之,它避免了特定的解决方案,因为语言设计对您来说已经足够好了快速轻松地构建您自己的领域语言。

旁注:df.columns 出人意料地是 Array[String] 而不是 Array[Column],也许他们希望它看起来像 Python pandas 的数据框。


你能给pyspark的等价物吗?
我的“age”字段得到“非法定义开始” .withColumn("age", $"age".cast(sql.types.DoubleType)) 。有什么建议吗?
如果我们出于性能原因在许多列上进行这些转换,您是否必须 .cache() 数据帧,或者 Spark 优化它们时不需要它?
导入可以是 import org.apache.spark.sql.types._,然后是 IntegerType 而不是 sql.types.IntegerType
d
dnlbrky

您可以使用 selectExpr 使其更简洁:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")

m
manishbelsare

用于将 DataFrame 的数据类型从 String 修改为 Integer 的 Java 代码

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))

它只会将现有的(字符串数据类型)转换为整数。


sql.types 中没有 DataTypes!它是 DataType。此外,可以简单地导入 IntegerType 并进行转换。
@EhsanM.Kermani 实际上 DatayaTypes.IntegerType 是一个合法的参考。
@Cupitor DataTypes.IntegerType 曾经在 DeveloperAPI mode,现在是 stable in v.2.1.0
这是最好的解决方案!
P
Piyush Patel

我认为这对我来说更具可读性。

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))

这会将您的年份列转换为 IntegerType,并创建任何临时列并删除这些列。如果要转换为任何其他数据类型,可以检查 org.apache.spark.sql.types 包中的类型。


P
Peter Rose

要将年份从字符串转换为整数,您可以在 csv 阅读器中添加以下选项:“inferSchema”-> “真”,见DataBricks documentation


这很好用,但问题是读者必须对您的文件进行第二次传递
@beefyhalo 绝对正确,有什么办法吗?
A
Andrej Kesely

生成一个包含五个值的简单数据集并将 int 转换为 string 类型:

val df = spark.range(5).select( col("id").cast("string") )

b
ben jarman

因此,这仅在您在保存到 sqlserver 之类的 jdbc 驱动程序时遇到问题时才真正有效,但它对于您在语法和类型方面遇到的错误非常有帮助。

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)

你能帮我用 Java 实现相同的代码吗?以及如何将 customJdbcDialect 注册到 DataFrame
不错,我对 Vertica 做了同样的事情,但自从 spark 2.1 之后。 JDbcUtil 您只需要实现您需要的特定数据类型。 dialect.getJDBCType(dt).orElse(getCommonJDBCType(dt)).getOrElse( throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.simpleString}"))
l
lxg

建议使用 cast 的答案,仅供参考,spark 1.4.1 中的 cast 方法已损坏。

例如,当转换为 bigint 时,字符串列的值为“8182175552014127960”的数据帧的值为“8182175552014128100”

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+

在发现这个错误之前,我们不得不面对很多问题,因为我们在生产中使用了 bigint 列。


psst,升级你的火花
@msemelman 对于一个小错误,不得不在生产中升级到新版本的 spark 是荒谬的。
我们不总是为小错误升级所有东西吗? :)
s
soulmachine
df.select($"long_col".cast(IntegerType).as("int_col"))

C
CodeChanger

您可以使用以下代码。

df.withColumn("year", df("year").cast(IntegerType))

这会将 year 列转换为 IntegerType 列。


E
Eric Bellet

使用 Spark Sql 2.4.0 你可以这样做:

spark.sql("SELECT STRING(NULLIF(column,'')) as column_string")

P
PirateJack

此方法将删除旧列并创建具有相同值和新数据类型的新列。创建 DataFrame 时我的原始数据类型是:-

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

在此之后,我运行以下代码来更改数据类型:-

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

在此之后,我的结果是: -

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)

您能否在这里提供您的解决方案。
M
Mehdi LAMRANI

这么多的答案,没有太多详尽的解释

以下语法在 Spark 2.4 中使用 Databricks Notebook

from pyspark.sql.functions import *
df = df.withColumn("COL_NAME", to_date(BLDFm["LOAD_DATE"], "MM-dd-yyyy"))

请注意,您必须指定您拥有的条目格式(在我的情况下为“MM-dd-yyyy”)并且导入是强制性的,因为 to_date 是 spark sql 函数

还尝试了这种语法,但得到了空值而不是正确的强制转换:

df = df.withColumn("COL_NAME", df["COL_NAME"].cast("Date"))

(请注意,我必须使用括号和引号使其语法正确) PS:我不得不承认这就像一个语法丛林,有很多可能的入口点方式,官方 API 参考缺乏适当的例子。


语法丛林。是的。这就是现在的 Spark 世界。
B
Bill Et Boule

另一种解决方案如下:

1)保持“inferSchema”为假

2)在行上运行'Map'函数时,您可以读取'asString'(row.getString ...)

//Read CSV and create dataset
Dataset<Row> enginesDataSet = sparkSession
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "true")
            .option("inferSchema","false")
            .load(args[0]);

JavaRDD<Box> vertices = enginesDataSet
            .select("BOX","BOX_CD")
            .toJavaRDD()
            .map(new Function<Row, Box>() {
                @Override
                public Box call(Row row) throws Exception {
                    return new Box((String)row.getString(0),(String)row.get(1));
                }
            });

r
remigiusz boguszewicz

为什么不按照 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast 中的说明进行操作

df.select(df.year.cast("int"),"make","model","comment","blank")

T
Tejasvi Sharma

可以通过在 spark sql 中使用 cast 来更改列的数据类型。表名是表,它有两列只有column1和column2,column1的数据类型要改变。 ex-spark.sql("select cast(column1 as Double) column1NewName,column2 from table") 代替 double 写入您的数据类型。


a
aschipfl

另一种方式:

// Generate a simple dataset containing five values and convert int to string type

val df = spark.range(5).select( col("id").cast("string")).withColumnRenamed("id","value")

c
cubic lettuce

如果您必须重命名由其名称给出的数十个列,以下示例采用@dnlbrky 的方法并将其一次应用于多个列:

df.selectExpr(df.columns.map(cn => {
    if (Set("speed", "weight", "height").contains(cn)) s"cast($cn as double) as $cn"
    else if (Set("isActive", "hasDevice").contains(cn)) s"cast($cn as boolean) as $cn"
    else cn
}):_*)

未铸造的列保持不变。所有列都保持原来的顺序。


A
Aravind Krishnakumar
    val fact_df = df.select($"data"(30) as "TopicTypeId", $"data"(31) as "TopicId",$"data"(21).cast(FloatType).as( "Data_Value_Std_Err")).rdd
    //Schema to be applied to the table
    val fact_schema = (new StructType).add("TopicTypeId", StringType).add("TopicId", StringType).add("Data_Value_Std_Err", FloatType)

    val fact_table = sqlContext.createDataFrame(fact_df, fact_schema).dropDuplicates()

R
Ravi

如果您想在不指定单个列名的情况下将特定类型的多个列更改为另一个列

/* Get names of all columns that you want to change type. 
In this example I want to change all columns of type Array to String*/
    val arrColsNames = originalDataFrame.schema.fields.filter(f => f.dataType.isInstanceOf[ArrayType]).map(_.name)

//iterate columns you want to change type and cast to the required type
val updatedDataFrame = arrColsNames.foldLeft(originalDataFrame){(tempDF, colName) => tempDF.withColumn(colName, tempDF.col(colName).cast(DataTypes.StringType))}

//display

updatedDataFrame.show(truncate = false)