我们如何连接 Apache Spark DataFrame 中的两列?我们可以使用 Spark SQL 中的任何函数吗?
对于原始 SQL,您可以使用 CONCAT
:
在 Python 中 df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v")) df.registerTempTable("df") sqlContext.sql("SELECT CONCAT(k, ' ', v) 来自 df")
在 Scala 中导入 sqlContext.implicits._ val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v") df.registerTempTable(" df") sqlContext.sql("SELECT CONCAT(k, ' ', v) FROM df")
从 Spark 1.5.0 开始,您可以将 concat
函数与 DataFrame API 一起使用:
在 Python 中:从 pyspark.sql.functions 导入 concat, col, lit df.select(concat(col("k"), lit(" "), col("v")))
在 Scala 中:导入 org.apache.spark.sql.functions.{concat, lit} df.select(concat($"k", lit(" "), $"v"))
还有一个 concat_ws
函数将字符串分隔符作为第一个参数。
这是自定义命名的方法
import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()
给,
+--------+--------+
|colname1|colname2|
+--------+--------+
| row11| row12|
| row21| row22|
+--------+--------+
通过连接创建新列:
df = df.withColumn('joined_column',
sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()
+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
| row11| row12| row11_row12|
| row21| row22| row21_row22|
+--------+--------+-------------+
在 Spark Scala 中连接字符串列的一种方法是使用 concat
。
有必要检查空值。因为如果其中一列为空,即使其他列之一确实有信息,结果也将为空。
使用 concat
和 withColumn
:
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))
使用 concat
和 select
:
val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")
使用这两种方法,您将拥有一个 NEW_COLUMN,其值是列的串联:来自原始 df 的 COL1 和 COL2。
concat_ws
而不是 concat
,则可以避免检查 NULL。
连接(*列)
v1.5 及更高版本
将多个输入列连接到一个列中。该函数适用于字符串、二进制和兼容的数组列。
例如:new_df = df.select(concat(df.a, df.b, df.c))
concat_ws(sep, *cols)
v1.5 及更高版本
与 concat
类似,但使用指定的分隔符。
例如:new_df = df.select(concat_ws('-', df.col1, df.col2))
map_concat(*cols)
v2.4 及更高版本
用于连接地图,返回所有给定地图的并集。
例如:new_df = df.select(map_concat("map1", "map2"))
使用 concat 运算符 (||
):
v2.3 及更高版本
例如:df = spark.sql("select col_a || col_b || col_c as abc from table_x")
如果您想使用 DF 执行此操作,您可以使用 udf 根据现有列添加新列。
val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)
//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))
//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )
//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
从 Spark 2.3(SPARK-22771) Spark SQL 支持连接运算符 ||
。
例如;
val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
这是为 pyspark 执行此操作的另一种方法:
#import concat and lit functions from pyspark.sql.functions
from pyspark.sql.functions import concat, lit
#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])
#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))
#Show the new data frame
personDF.show()
----------RESULT-------------------------
84
+------------+
|East African|
+------------+
| Ethiopian|
| Kenyan|
| Ugandan|
| Rwandan|
+------------+
当您不知道 Dataframe 中列的数量或名称时,这是一个建议。
val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
我们是否有对应于以下过程的java语法
val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
在 Spark 2.3.0 中,您可以:
spark.sql( """ select '1' || column_a from table_a """)
在 Java 中,您可以这样做来连接多个列。示例代码旨在为您提供一个场景以及如何使用它以便更好地理解。
SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
.withColumn("concatenatedCol",
concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));
class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;
public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession.builder().config(sparkConf)
.getOrCreate();
}
return instance;
}
}
上面的代码将 col1、col2、col3 连接起来,用“_”分隔来创建一个名为“concatenatedCol”的列。
在我的情况下,我想要一个 Pipe-'I' 分隔的行。
from pyspark.sql import functions as F
df.select(F.concat_ws('|','_c1','_c2','_c3','_c4')).show()
这就像黄油上的热刀一样效果很好。
像这样使用 concat 方法:
Dataset<Row> DF2 = DF1
.withColumn("NEW_COLUMN",concat(col("ADDR1"),col("ADDR2"),col("ADDR3"))).as("NEW_COLUMN")
使用 sqlContext 在 pySpark 中执行此操作的另一种方法...
#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])
# Now we can concatenate columns and assign the new column a name
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))
确实,有一些漂亮的内置抽象供您完成连接,而无需实现自定义函数。既然你提到了 Spark SQL,所以我猜你正试图通过 spark.sql() 将它作为声明性命令传递。如果是这样,您可以直接通过 SQL 命令完成,例如:SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;
此外,从 Spark 2.3.0 开始,您可以使用以下行中的命令:SELECT col1 || col2 AS concat_column_name FROM <table_name>;
其中,是您首选的分隔符(也可以是空白空间),并且是您尝试从中读取的临时表或永久表。
我们也可以简单地使用 SelectExpr
。
df1.selectExpr("*","upper(_2||_3) as new")
我们可以在数据框的 select 方法中使用 concat()
val fullName = nameDF.select(concat(col("FirstName"), lit(" "), col("LastName")).as("FullName"))
使用 withColumn
和 concat
val fullName1 = nameDF.withColumn("FullName", concat(col("FirstName"), lit(" "), col("LastName")))
使用 spark.sql
连接函数
val fullNameSql = spark.sql("select Concat(FirstName, LastName) as FullName from names")
取自 https://www.sparkcodehub.com/spark-dataframe-concat-column
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))
注意:要使此代码正常工作,您需要将括号“()”放在“isNotNull”函数中。 -> 正确的是“isNotNull()”。
val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull(), col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull(), col("COL2")).otherwise(lit("null"))))
lit
创建一列_