我开始使用 Spark DataFrames,我需要能够旋转数据以从 1 列和多行中创建多列。 Scalding 中有内置功能,我相信 Python 中的 Pandas,但我找不到新的 Spark Dataframe 的任何东西。
我假设我可以编写某种自定义函数来做到这一点,但我什至不知道如何开始,特别是因为我是 Spark 的新手。如果有人知道如何使用内置功能或有关如何在 Scala 中编写内容的建议来做到这一点,我们将不胜感激。
As mentioned by David Anderson Spark 从 1.6 版开始提供 pivot
功能。一般语法如下所示:
df
.groupBy(grouping_columns)
.pivot(pivot_column, [values])
.agg(aggregate_expressions)
使用 nycflights13
和 csv
格式的用法示例:
Python:
from pyspark.sql.functions import avg
flights = (sqlContext
.read
.format("csv")
.options(inferSchema="true", header="true")
.load("flights.csv")
.na.drop())
flights.registerTempTable("flights")
sqlContext.cacheTable("flights")
gexprs = ("origin", "dest", "carrier")
aggexpr = avg("arr_delay")
flights.count()
## 336776
%timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
## 10 loops, best of 3: 1.03 s per loop
斯卡拉:
val flights = sqlContext
.read
.format("csv")
.options(Map("inferSchema" -> "true", "header" -> "true"))
.load("flights.csv")
flights
.groupBy($"origin", $"dest", $"carrier")
.pivot("hour")
.agg(avg($"arr_delay"))
爪哇:
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;
Dataset<Row> df = spark.read().format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("flights.csv");
df.groupBy(col("origin"), col("dest"), col("carrier"))
.pivot("hour")
.agg(avg(col("arr_delay")));
R / SparkR:
library(magrittr)
flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)
flights %>%
groupBy("origin", "dest", "carrier") %>%
pivot("hour") %>%
agg(avg(column("arr_delay")))
R/sparklyr
library(dplyr)
flights <- spark_read_csv(sc, "flights", "flights.csv")
avg.arr.delay <- function(gdf) {
expr <- invoke_static(
sc,
"org.apache.spark.sql.functions",
"avg",
"arr_delay"
)
gdf %>% invoke("agg", expr, list())
}
flights %>%
sdf_pivot(origin + dest + carrier ~ hour, fun.aggregate=avg.arr.delay)
SQL:
请注意,从 2.4 版开始支持 Spark SQL 中的 PIVOT 关键字。
CREATE TEMPORARY VIEW flights
USING csv
OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;
SELECT * FROM (
SELECT origin, dest, carrier, arr_delay, hour FROM flights
) PIVOT (
avg(arr_delay)
FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
);
示例数据:
"year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00
性能考虑:
一般来说,旋转是一项昂贵的操作。
如果可以,请尝试提供值列表,因为这样可以避免计算唯一性的额外命中:vs = list(range(25)) %timeit -n10 flight.groupBy(*gexprs ).pivot("hour", vs) .agg(aggexpr).count() ## 10 个循环,最好的 3 个:每个循环 392 毫秒
在某些情况下,重新分区和/或预先聚合数据被证明是有益的(在 2.0 或更高版本中可能不再值得努力)
仅用于重塑,您可以先使用:Pivot String column on Pyspark Dataframe
相关问题:
如何融化 Spark DataFrame?
在 spark-sql/pyspark 中反透视
使用 Spark 将列转置为行
我通过编写一个 for 循环来动态创建 SQL 查询来克服这个问题。说我有:
id tag value
1 US 50
1 UK 100
1 Can 125
2 US 75
2 UK 150
2 Can 175
而且我要:
id US UK Can
1 50 100 125
2 75 150 175
我可以创建一个包含我想要透视的值的列表,然后创建一个包含我需要的 SQL 查询的字符串。
val countries = List("US", "UK", "Can")
val numCountries = countries.length - 1
var query = "select *, "
for (i <- 0 to numCountries-1) {
query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "
}
query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"
myDataFrame.registerTempTable("myTable")
val myDF1 = sqlContext.sql(query)
我可以创建类似的查询然后进行聚合。不是一个非常优雅的解决方案,但它适用于任何值列表并且很灵活,也可以在调用代码时作为参数传入。
我已经通过以下步骤使用数据框解决了类似的问题:
为您的所有国家/地区创建列,并将“值”作为值:
import org.apache.spark.sql.functions._
val countries = List("US", "UK", "Can")
val countryValue = udf{(countryToCheck: String, countryInRow: String, value: Long) =>
if(countryToCheck == countryInRow) value else 0
}
val countryFuncs = countries.map{country => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value"))) }
val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")
您的数据框“dfWithCountries”将如下所示:
+--+--+---+---+
|id|US| UK|Can|
+--+--+---+---+
| 1|50| 0| 0|
| 1| 0|100| 0|
| 1| 0| 0|125|
| 2|75| 0| 0|
| 2| 0|150| 0|
| 2| 0| 0|175|
+--+--+---+---+
现在您可以将所有值加在一起以获得所需的结果:
dfWithCountries.groupBy("id").sum(countries: _*).show
结果:
+--+-------+-------+--------+
|id|SUM(US)|SUM(UK)|SUM(Can)|
+--+-------+-------+--------+
| 1| 50| 100| 125|
| 2| 75| 150| 175|
+--+-------+-------+--------+
虽然这不是一个非常优雅的解决方案。我必须创建一个函数链来添加到所有列中。此外,如果我有很多国家,我会将我的临时数据集扩展到一个非常宽的集合,其中包含很多零。
有简单而优雅的解决方案。
scala> spark.sql("select * from k_tags limit 10").show()
+---------------+-------------+------+
| imsi| name| value|
+---------------+-------------+------+
|246021000000000| age| 37|
|246021000000000| gender|Female|
|246021000000000| arpu| 22|
|246021000000000| DeviceType| Phone|
|246021000000000|DataAllowance| 6GB|
+---------------+-------------+------+
scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
+---------------+-------------+----------+---+----+------+
| imsi|DataAllowance|DeviceType|age|arpu|gender|
+---------------+-------------+----------+---+----+------+
|246021000000000| 6GB| Phone| 37| 22|Female|
|246021000000001| 1GB| Phone| 72| 10| Male|
+---------------+-------------+----------+---+----+------+
有一个简单的旋转方法:
id tag value
1 US 50
1 UK 100
1 Can 125
2 US 75
2 UK 150
2 Can 175
import sparkSession.implicits._
val data = Seq(
(1,"US",50),
(1,"UK",100),
(1,"Can",125),
(2,"US",75),
(2,"UK",150),
(2,"Can",175),
)
val dataFrame = data.toDF("id","tag","value")
val df2 = dataFrame
.groupBy("id")
.pivot("tag")
.max("value")
df2.show()
+---+---+---+---+
| id|Can| UK| US|
+---+---+---+---+
| 1|125|100| 50|
| 2|175|150| 75|
+---+---+---+---+
有很多关于数据集/数据框的枢轴操作示例,但我找不到很多使用 SQL 的示例。这是一个对我有用的例子。
create or replace temporary view faang
as SELECT stock.date AS `Date`,
stock.adj_close AS `Price`,
stock.symbol as `Symbol`
FROM stock
WHERE (stock.symbol rlike '^(FB|AAPL|GOOG|AMZN)$') and year(date) > 2010;
SELECT * from faang
PIVOT (max(price) for symbol in ('AAPL', 'FB', 'GOOG', 'AMZN')) order by date;
最初我采用了 Al M 的解决方案。后来也抱着同样的想法,把这个函数改写为转置函数。
此方法使用键和值列将任何 df 行转换为任何数据格式的列
用于输入 csv
id,tag,value
1,US,50a
1,UK,100
1,Can,125
2,US,75
2,UK,150
2,Can,175
输出
+--+---+---+---+
|id| UK| US|Can|
+--+---+---+---+
| 2|150| 75|175|
| 1|100|50a|125|
+--+---+---+---+
转置方法:
def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = {
val distinctCols = df.select(key).distinct.map { r => r(0) }.collect().toList
val rdd = df.map { row =>
(compositeId.collect { case id => row.getAs(id).asInstanceOf[Any] },
scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))
}
val pairRdd = rdd.reduceByKey(_ ++ _)
val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))
}
private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) = {
val cols = colNames.collect { case col => r._2.getOrElse(col.toString(), null) }
val array = r._1 ++ cols
Row(array: _*)
}
private def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType = {
val idSchema = idCols.map { idCol => srcSchema.apply(idCol) }
val colSchema = srcSchema.apply(distinctCols._1)
val colsSchema = distinctCols._2.map { col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable) }
StructType(idSchema ++ colsSchema)
}
主要片段
import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructField
...
...
def main(args: Array[String]): Unit = {
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
.load("data.csv")
dfdata1.show()
val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
dfOutput.show
}
内置火花枢轴功能效率低下。下面的实现适用于 spark 2.4+ - 这个想法是聚合地图并将值提取为列。唯一的限制是它不处理透视列中的聚合函数,只处理列。
在 8M 表上,这些函数在 3 秒内应用,而在内置 spark 版本中需要 40 分钟:
# pass an optional list of string to avoid computation of columns
def pivot(df, group_by, key, aggFunction, levels=[]):
if not levels:
levels = [row[key] for row in df.filter(col(key).isNotNull()).groupBy(col(key)).agg(count(key)).select(key).collect()]
return df.filter(col(key).isin(*levels) == True).groupBy(group_by).agg(map_from_entries(collect_list(struct(key, expr(aggFunction)))).alias("group_map")).select([group_by] + ["group_map." + l for l in levels])
# Usage
pivot(df, "id", "key", "value")
pivot(df, "id", "key", "array(value)")
// pass an optional list of string to avoid computation of columns
def pivot(df: DataFrame, groupBy: Column, key: Column, aggFunct: String, _levels: List[String] = Nil): DataFrame = {
val levels =
if (_levels.isEmpty) df.filter(key.isNotNull).select(key).distinct().collect().map(row => row.getString(0)).toList
else _levels
df
.filter(key.isInCollection(levels))
.groupBy(groupBy)
.agg(map_from_entries(collect_list(struct(key, expr(aggFunct)))).alias("group_map"))
.select(groupBy.toString, levels.map(f => "group_map." + f): _*)
}
// Usage:
pivot(df, col("id"), col("key"), "value")
pivot(df, col("id"), col("key"), "array(value)")
Spark 一直在对 Pivoting the Spark DataFrame 进行改进。 Spark 1.6 版本的 Spark DataFrame API 中添加了一个数据透视函数,它存在性能问题,已在 Spark 2.0 中得到纠正
但是,如果您使用的是较低版本;请注意,pivot 是一项非常昂贵的操作,因此,建议提供列数据(如果已知)作为函数的参数,如下所示。
val countries = Seq("USA","China","Canada","Mexico")
val pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show()
这已在 Pivoting and Unpivoting Spark DataFrame 中详细解释
快乐学习!!