问题的简短版本!
考虑以下代码段(假设 spark
已设置为某个 SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
请注意,温度字段是浮点数列表。我想将这些浮点数列表转换为 MLlib 类型 Vector
,并且我希望使用基本的 DataFrame
API 而不是通过 RDD 来表达这种转换(这是低效的,因为它从JVM 到 Python,处理是在 Python 中完成的,我们没有得到 Spark 的 Catalyst 优化器的好处,yada yada)。我该怎么做呢?具体来说:
有没有办法让一个直接的演员工作?请参阅下文了解详细信息(以及尝试解决方法失败)?或者,有没有其他操作有我想要的效果?我在下面建议的两种替代解决方案中哪个更有效(UDF 与爆炸/重新组装列表中的项目)?还是有任何其他几乎但不完全正确的替代方案比它们中的任何一个都好?
直接投射不起作用
这就是我期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换。作为上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
现在例如 df_with_strings.collect()[0]["temperatures"][1]
是 '-7.0'
。但是,如果我转换为 ml Vector,那么事情就不那么顺利了:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
哎呀!任何想法如何解决这一问题?
可能的替代方案
备选方案 1:使用 VectorAssembler
有一个 Transformer
似乎非常适合这项工作:VectorAssembler
。它需要一列或多列并将它们连接成一个向量。不幸的是,它只需要 Vector
和 Float
列,而不是 Array
列,因此以下操作不起作用:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
它给出了这个错误:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
我能想到的最好的解决方法是将列表分解为多列,然后使用 VectorAssembler
将它们全部收集起来:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
这似乎是理想的,除了 TEMPERATURE_COUNT
超过 100,有时超过 1000。(另一个问题是,如果您事先不知道数组的大小,代码会更复杂,虽然我的数据不是这种情况。)Spark 是否真的生成了一个包含那么多列的中间数据集,或者它只是认为这是单个项目暂时通过的中间步骤(或者实际上它是否完全优化了这个远离步骤)看到这些列的唯一用途是组装成一个向量)?
备选方案 2:使用 UDF
一个更简单的替代方法是使用 UDF 进行转换。这让我可以在一行代码中非常直接地表达我想要做的事情,并且不需要制作一个包含大量列的数据集。但是所有这些数据都必须在 Python 和 JVM 之间交换,并且每个单独的数字都必须由 Python 处理(众所周知,Python 迭代单个数据项的速度很慢)。看起来是这样的:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
可忽略的备注
这个漫无边际的问题的其余部分是我在尝试寻找答案时想出的一些额外的东西。大多数阅读本文的人可能会跳过它们。
不是解决方案:使用 Vector 开头
在这个简单的示例中,可以使用向量类型开始创建数据,但当然我的数据并不是真正要并行化的 Python 列表,而是从数据源中读取的。但为了记录,这看起来是这样的:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
低效解决方案:使用 map()
一种可能性是使用 RDD map()
方法将列表转换为 Vector
。这类似于 UDF 的想法,只是它更糟糕,因为每行中的所有字段都会产生序列化等成本,而不仅仅是正在操作的字段。作为记录,这是该解决方案的样子:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
尝试解决演员阵容失败
无奈之下,我注意到 Vector
在内部由一个具有四个字段的结构表示,但使用该类型结构的传统转换也不起作用。这是一个插图(我使用 udf 构建了结构,但 udf 不是重要部分):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
这给出了错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
就我个人而言,我会使用 Python UDF,而不会打扰其他任何事情:
向量不是本机 SQL 类型,因此会以一种或另一种方式产生性能开销。特别是这个过程需要两个步骤,首先将数据从外部类型转换为行,然后使用通用 RowEncoder 从行转换为内部表示。
任何下游的 ML Pipeline 都将比简单的转换昂贵得多。此外,它需要一个与上述过程相反的过程
但如果你真的想要其他选择,你是:
带有 Python 包装器的 Scala UDF:按照项目站点上的说明安装 sbt。创建具有以下结构的 Scala 包:. ├── build.sbt └── udfs.scala 编辑 build.sbt(调整以反映 Scala 和 Spark 版本): scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% " spark-sql" % "2.4.4", "org.apache.spark" %% "spark-mllib" % "2.4.4" ) 编辑 udfs.scala:包 com.example.spark.udfs 导入 org.apache。 spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) } 封装:sbt package并包括(或等效的,取决于 Scala 版本): $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar 作为启动 shell / 提交应用程序时 --driver-class-path 的参数。在 PySpark 中定义一个包装器: from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs .as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column))) 测试:with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show() +-- ------+------------------+----------------+ |城市|温度|矢量| +--------+------------------+----------------+ |芝加哥|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |纽约|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema( ) 根 |-- 城市:字符串 (nullable = true) |-- 温度:数组 (nullable = true) | |-- 元素:双精度 (containsNull = true) |-- 向量:向量 (nullable = true)
将数据转储为反映 DenseVector 模式的 JSON 格式并将其读回: from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # 类型 1 是密集的,类型 0 是稀疏的 col("temperatures").alias("values") ).alias("v")) ) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn("parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show() +-- ------+------------------+----------------+ |城市|温度|解析向量| +--------+------------------+----------------+ |芝加哥|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |纽约|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_parsed_vector.printSchema( ) 根 |-- 城市:字符串 (nullable = true) |-- 温度:数组 (nullable = true) | |-- 元素:双精度(containsNull = true) |-- parsed_vector:向量(nullable = true)
我和你有同样的问题,我就是这样做的。这种方式包括 RDD 转换,因此对性能不是很重要,但它确实有效。
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])
new_df
结果是,
DataFrame[city: string, temperatures: vector]
对于 pyspark>=3.1.0
从 3.1.0 开始,有一个构建解决方案:array_to_vector。
鉴于您的情况:
from pyspark.ml.functions import vector_to_array
df = df.withColumn("temperatures_vectorized", vector_to_array("temperatures"))
PS:从 3.0.0 开始,还有一个相反的操作:vector_to_array
不定期副业成功案例分享
f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
Pyspark 方法中的这一行给出的错误为TypeError: 'JavaPackage' object is not callable
。我需要为此安装一些java包吗?