ChatGPT解决这个技术问题 Extra ChatGPT

pyspark dataframe filter or include based on list

I am trying to filter a dataframe in pyspark using a list. I want to either filter based on the list or include only those records with a value in the list. My code below does not work:

# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [10,18,20]

# filter out records by scores by list l
records = df.filter(df.score in l)
# expected: (0,1), (0,1), (0,2), (1,2)

# include only records with these scores in list l
records = df.where(df.score in l)
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)

Gives the following error: ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.


S
Skippy le Grand Gourou

what it says is "df.score in l" can not be evaluated because df.score gives you a column and "in" is not defined on that column type use "isin"

The code should be like this:

# define a dataframe
rdd = sc.parallelize([(0,1), (0,1), (0,2), (1,2), (1,10), (1,20), (3,18), (3,18), (3,18)])
df = sqlContext.createDataFrame(rdd, ["id", "score"])

# define a list of scores
l = [10,18,20]

# filter out records by scores by list l
records = df.filter(~df.score.isin(l))
# expected: (0,1), (0,1), (0,2), (1,2)

# include only records with these scores in list l
df.filter(df.score.isin(l))
# expected: (1,10), (1,20), (3,18), (3,18), (3,18)

Note that where() is an alias for filter(), so both are interchangeable.


How would you do this with a broadcast variable as a list instead of a regular python list? I'm getting a 'Broadcast' object has no attribute '_get_object_id' error when I try and do it that way.
@flyingmeatball I think you can broadcast_variable_name.value to access the list
If you want to use broadcasting then the this is the way to go: l_bc = sc.broadcast(l) followed by df.where(df.score.isin(l_bc.value))
If you are trying to filter the dataframe based on a list of column values, this might help: stackoverflow.com/a/66228314/530399
V
Vzzarr

based on @user3133475 answer, it is also possible to call the isin() function from F.col() like this:

import pyspark.sql.functions as F


l = [10,18,20]
df.filter(F.col("score").isin(l))

b
bantmen

I found the join implementation to be significantly faster than where for large dataframes:

def filter_spark_dataframe_by_list(df, column_name, filter_list):
    """ Returns subset of df where df[column_name] is in filter_list """
    spark = SparkSession.builder.getOrCreate()
    filter_df = spark.createDataFrame(filter_list, df.schema[column_name].dataType)
    return df.join(filter_df, df[column_name] == filter_df["value"])