ChatGPT解决这个技术问题 Extra ChatGPT

How to join on multiple columns in Pyspark?

I am using Spark 1.3 and would like to join on multiple columns using python interface (SparkSQL)

The following works:

I first register them as temp tables.

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')

I would now like to join them based on multiple columns.

I get SyntaxError: invalid syntax with this:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')

C
Community

You should use & / | operators and be careful about operator precedence (== has lower precedence than bitwise AND and OR):

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()

## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## |  2|  b|3.0|  2|  b|0.0|
## +---+---+---+---+---+---+

When you say 'be careful about operator precedence', what do you mean? Do you mean I should put parentheses in the right place to AND the correct tables together?
@Chogg, what he means is that if you're not careful with parentheses, the phrase df1.x1 == df2.x1 & df1.x2 == df2.x2, (parentheses removed) would be evaluated by the Python interpreter as df1.x1 == (df2.x1 & df1.x2) == df2.x2, which would potentially throw a confusing and non-descriptive error.
F
Florian

An alternative approach would be:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))

df = df1.join(df2, ['x1','x2'])
df.show()

which outputs:

+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

With the main advantage being that the columns on which the tables are joined are not duplicated in the output, reducing the risk of encountering errors such as org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.

Whenever the columns in the two tables have different names, (let's say in the example above, df2 has the columns y1, y2 and y4), you could use the following syntax:

df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])

what if I do an outer join and like to keep only a single occurrence of the key
This is probably my least favorite pyspark error: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L. I don't understand why it lets you do something like df = df1.join(df2, df1.x1 == df2.x1) and then errors as soon as you try to do almost anything with the resulting df. That's just a minor rant, but is there any reason why you'd ever want the resulting df with duplicated names?
P
Paul Roub
test = numeric.join(Ref, 
   on=[
     numeric.ID == Ref.ID, 
     numeric.TYPE == Ref.TYPE,
     numeric.STATUS == Ref.STATUS 
   ], how='inner')

Welcome to StackOverflow. Can you maybe explain your code a bit more? Why is it structured like this? How does it work? etc.
Answer's are great. But for best practices, please provide an explanation. You only posting code makes the OP and future commers copy and paste your answer without understanding the logic behind the answer. Please provide an answer with some explanation. Thank You!