ChatGPT解决这个技术问题 Extra ChatGPT

How do I convert an array (i.e. list) column to Vector

Short version of the question!

Consider the following snippet (assuming spark is already set to some SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

Notice that the temperatures field is a list of floats. I would like to convert these lists of floats to the MLlib type Vector, and I'd like this conversion to be expressed using the basic DataFrame API rather than going via RDDs (which is inefficient because it sends all data from the JVM to Python, the processing is done in Python, we don't get the benefits of Spark's Catalyst optimizer, yada yada). How do I do this? Specifically:

Is there a way to get a straight cast working? See below for details (and a failed attempt at a workaround)? Or, is there any other operation that has the effect I was after? Which is more efficient out of the two alternative solutions I suggest below (UDF vs exploding/reassembling the items in the list)? Or are there any other almost-but-not-quite-right alternatives that are better than either of them?

A straight cast doesn't work

This is what I would expect to be the "proper" solution. I want to convert the type of a column from one type to another, so I should use a cast. As a bit of context, let me remind you of the normal way to cast it to another type:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

Now e.g. df_with_strings.collect()[0]["temperatures"][1] is '-7.0'. But if I cast to an ml Vector then things do not go so well:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

This gives an error:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

Yikes! Any ideas how to fix this?

Possible alternatives

Alternative 1: Using VectorAssembler

There is a Transformer that seems almost ideal for this job: the VectorAssembler. It takes one or more columns and concatenates them into a single vector. Unfortunately it only takes Vector and Float columns, not Array columns, so the follow doesn't work:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

It gives this error:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

The best work around I can think of is to explode the list into multiple columns and then use the VectorAssembler to collect them all back up again:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

This seems like it would be ideal, except that TEMPERATURE_COUNT be more than 100, and sometimes more than 1000. (Another problem is that the code would be more complicated if you don't know the size of the array in advance, although that is not the case for my data.) Does Spark actually generate an intermediate data set with that many columns, or does it just consider this an intermediate step that individual items pass through transiently (or indeed does it optimise this away step entirely when it sees that the only use of these columns is to be assembled into a vector)?

Alternative 2: use a UDF

A rather simpler alternative is to use a UDF to do the conversion. This lets me express quite directly what I want to do in one line of code, and doesn't require making a data set with a crazy number of columns. But all that data has to be exchanged between Python and the JVM, and every individual number has to be handled by Python (which is notoriously slow for iterating over individual data items). Here is how that looks:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

Ignorable remarks

The remaining sections of this rambling question are some extra things I came up with while trying to find an answer. They can probably be skipped by most people reading this.

Not a solution: use Vector to begin with

In this trivial example it's possible to create the data using the vector type to begin with, but of course my data isn't really a Python list that I'm parallelizing, but instead is being read from a data source. But for the record, here is how that would look:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

Inefficient solution: use map()

One possibility is to use the RDD map() method to transform the list to a Vector. This is similar to the UDF idea, except that its even worse because the cost of serialisation etc. is incurred for all the fields in each row, not just the one being operated on. For the record, here's what that solution would look like:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

Failed attempt at a workaround for cast

In desperation, I noticed that Vector is represented internally by a struct with four fields, but using a traditional cast from that type of struct doesn't work either. Here is an illustration (where I built the struct using a udf but the udf isn't the important part):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

This gives the error:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
Can someone please post an answer on how to do this with Spark version 2.4.3+ using dataframe

u
user10938362

Personally I would go with Python UDF and wouldn't bother with anything else:

Vectors are not native SQL types so there will be performance overhead one way or another. In particular this process requires two steps where data is first converted from external type to row, and then from row to internal representation using generic RowEncoder.

Any downstream ML Pipeline will be much more expensive than a simple conversion. Moreover it requires a process which opposite to the one described above

But if you really want other options here you are:

Scala UDF with Python wrapper: Install sbt following the instructions on the project site. Create Scala package with following structure: . ├── build.sbt └── udfs.scala Edit build.sbt (adjust to reflect Scala and Spark version): scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-mllib" % "2.4.4" ) Edit udfs.scala: package com.example.spark.udfs import org.apache.spark.sql.functions.udf import org.apache.spark.ml.linalg.DenseVector object udfs { val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray)) } Package: sbt package and include (or equivalent depending on Scala version): $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar as an argument for --driver-class-path when starting shell / submitting application. In PySpark define a wrapper: from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column))) Test: with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show() +--------+------------------+----------------+ | city| temperatures| vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_vec.printSchema() root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- vector: vector (nullable = true)

Dump data to a JSON format reflecting DenseVector schema and read it back: from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show() +--------+------------------+----------------+ | city| temperatures| parsed_vector| +--------+------------------+----------------+ | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]| |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]| +--------+------------------+----------------+ with_parsed_vector.printSchema() root |-- city: string (nullable = true) |-- temperatures: array (nullable = true) | |-- element: double (containsNull = true) |-- parsed_vector: vector (nullable = true)


f = sc._jvm.com.example.spark.udfs.udfs.as_vector() This line in Pyspark method gives error as TypeError: 'JavaPackage' object is not callable. Do I need to install some java package for this?
@user7348570 Sounds like a CLASSPATH issue.
I have this error as well. what's the workaround for that ?
G
GGDammy

I had a same problem like you and I did this way. This way includes RDD transformation, so is not performance critical, but it works.

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

city_rdd = df.rdd.map(lambda row:row[0])
temp_rdd = df.rdd.map(lambda row:row[1])
new_df = city_rdd.zip(temp_rdd.map(lambda x:Vectors.dense(x))).toDF(schema=['city','temperatures'])

new_df

the result is,

DataFrame[city: string, temperatures: vector]

This is another option, thanks for mentioning it. But if performance isn't critical then you can use also use a UDF as I mentioned in my question, which I expect is better than RDD in most cases. The RDD transformation sends all data to Python whereas UDF sends just the relevant columnn. The RDD transformation also requires more code because you must say how to handle all columns, even the ones you want to leave unaffected, unlike UDF. But RDD transformation might be better if you want to manipulate lots of columns.
S
Stanislav Barabanov

For pyspark>=3.1.0

Since 3.1.0 there is a build-it solution: array_to_vector.

Given your case:

from pyspark.ml.functions import vector_to_array
df = df.withColumn("temperatures_vectorized", vector_to_array("temperatures"))

P.S: Also since 3.0.0 there is an opposite operation as well: vector_to_array