I have 2 dataframes:
df
contains all train routes with origin and arrival columns (both ids and names)df_relation
contains Station (Gare) name, relation and API number.
Goal: I need to join these two dataframes twice on both origin and arrival columns.
I tried this:
df.groupBy("origin", "origin_id", "arrival", "direction") \
.agg({'time_travelled': 'avg'}) \
.filter(df.direction == 0) \
.join(df_relation, df.origin == df_relation.Gare, "inner") \
.join(df_relation, df.arrival == df_relation.Gare, "inner") \
.orderBy("Relation")
.show()
But I got the following AnalysisException
AnalysisException: Column Gare#1708 are ambiguous.
It's probably because you joined several Datasets together, and some of these Datasets are the same.
This column points to one of the Datasets but Spark is unable to figure out which one.
Please alias the Datasets with different names viaDataset.as
before joining them, and specify the column using qualified name, e.g.df.as("a").join(df.as("b"), $"a.id" > $"b.id")
.
You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
How to rewrite this?
I have unsuccessfully tried to blindly follow the error recommendation like this
.as("a").join(df_relation.as("b"), $"a.arrival" == $"b.Gare", "inner")
This is my first dataframe (df
):
-------------------- ---------- -------------------- --------- -------------------
| origin| origin_id| arrival|direction|avg(time_travelled)|
-------------------- ---------- -------------------- --------- -------------------
| Gare du Nord|IDFM:71410|La Plaine Stade d...| 1.0| 262.22222222222223|
| Gare du Nord|IDFM:71410|Aéroport CDG 1 (T...| 1.0| 1587.7551020408164|
|Villeparisis - Mi...|IDFM:68916| Mitry - Claye| 1.0| 240.0|
| Villepinte|IDFM:73547|Parc des Expositions| 1.0| 90.33898305084746|
| Le Blanc-Mesnil|IDFM:72648| Aulnay-sous-Bois| 1.0| 105.04273504273505|
|Aéroport CDG 1 (T...|IDFM:73596|Aéroport Charles ...| 1.0| 145.27777777777777|
This my second dataframe (df_relation
):
----------------------------------------- -------- --------
|Gare |Relation|Gare Api|
----------------------------------------- -------- --------
|Aéroport Charles de Gaulle 2 (Terminal 2)|1 |87001479|
|Aéroport CDG 1 (Terminal 3) - RER |2 |87271460|
|Parc des Expositions |3 |87271486|
|Villepinte |4 |87271452|
And this is what I am trying to achieve:
-------------------- ----------- -------------------- --------- ------------------- -------- ---------- -----------
| origin| origin_id| arrival|direction|avg(time_travelled)|Relation|Api origin|Api arrival|
-------------------- ----------- -------------------- --------- ------------------- -------- ---------- -----------
|Aéroport Charles ...| IDFM:73699|Aéroport CDG 1 (T...| 0.0| 110.09345794392523| 1| 87001479| 87271460|
|Aéroport CDG 1 (T...| IDFM:73596|Parc des Expositions| 0.0| 280.17543859649123| 2| 87271460| 87271486|
|Aéroport CDG 1 (T...| IDFM:73596| Gare du Nord| 0.0| 1707.4| 2| 87271460| 87271007|
|Parc des Expositions| IDFM:73568| Villepinte| 0.0| 90.17543859649123| 3| 87271486| 87271452|
| Villepinte| IDFM:73547| Sevran Beaudottes| 0.0| 112.45614035087719| 4| 87271452| 87271445|
| Sevran Beaudottes| IDFM:73491| Aulnay-sous-Bois| 0.0| 168.24561403508773| 5| 87271445| 87271411|
| Mitry - Claye| IDFM:69065|Villeparisis - Mi...| 0.0| 210.51724137931035| 6| 87271528| 87271510|
|Villeparisis - Mi...| IDFM:68916| Vert Galant| 0.0| 150.0| 7| 87271510| 87271510|
CodePudding user response:
You take the original df
and join df_relation
twice. This way you create duplicate columns for every column in df_relation
. Column "Gare" just happens to be the first of them, so it is depicted in the error message.
To avoid the error, you will have to create alias
for your dataframes. Notice how I create them
df_agg.alias("agg")
df_relation.alias("rel_o")
df_relation.alias("rel_a")
and how I latter reference them before every column.
from pyspark.sql import functions as F
df_agg = (
df.filter(F.col("direction") == 0)
.groupBy("origin", "origin_id", "arrival", "direction")
.agg({"time_travelled": "avg"})
)
df_result = (
df_agg.alias("agg")
.join(df_relation.alias("rel_o"), F.col("agg.origin") == F.col("rel_o.Gare"), "inner")
.join(df_relation.alias("rel_a"), F.col("agg.arrival") == F.col("rel_a.Gare"), "inner")
.orderBy("rel_o.Relation")
.select(
"agg.*",
"rel_o.Relation",
F.col("rel_o.Gare Api").alias("Api origin"),
F.col("rel_a.Gare Api").alias("Api arrival"),
)
)