I am trying to join two dataframes:
df1, columns:
document_nbr, line_id, product_id, product_size, reference_nbr, local_cd, invoice_local_cost, invoice_delivery_id, created_by_id, transaction_ind, etl_tmst, record_created_tmst, record_updated_tmst, source_id, line_number, etl_date
Joining df1 with df2:
df2 columns:
document_nbr, line_id, variant_id, line_nbr as line_number
on columns:
df1.document_nbr = df2.document_nbr and df1.line_id == df2.line_id
Join type: left
I have a column line_number
present in both the dataframes df1
and df1
and I want to drop same column from df1
and select it from df2
.
To do that, I wrote the below logic to select all the columns but line_number
from df1
and select line_number
from df2
df1 = df1.join(df2.alias('df2'),
on=(df1.document_nbr == df2.document_nbr) & (df1.line_id == df2.line_id),
how='left').\
select([col(c) for c in df1.columns if c != 'line_number'], col("df2.line_number"))
When I run the code, I get the exception:
Invalid argument, not a string or column: [Column<b'document_nbr'>, Column<b'line_id'>, Column<b'product_id'>, Column<b'product_size'>, Column<b'reference_nbr'>, Column<b'local_cd'>, Column<b'invoice_local_cost'>, Column<b'invoice_delivery_id'>, Column<b'created_by_id'>, Column<b'transaction_ind'>, Column<b'etl_tmst'>, Column<b'record_created_tmst'>, Column<b'record_updated_tmst'>, Column<b'source_id'>, Column<b'source_id'>,Column<b'etl_date'>] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
Traceback (most recent call last):
File "/local_disk0/tmp/spark-14a2e8a1-b280-4c83-a724-16976b1bc277/template_python-1.0.0-py3-none-any.whl/src/line_upd.py", line 66, in line_transformation
how='left').select([col(c) for c in df1.columns if c != 'line_number'], col("df2.line_number"))
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 1439, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 1233, in _jcols
return self._jseq(cols, _to_java_column)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 1220, in _jseq
return _to_seq(self.sql_ctx._sc, cols, converter)
File "/databricks/spark/python/pyspark/sql/column.py", line 68, in _to_seq
cols = [converter(c) for c in cols]
File "/databricks/spark/python/pyspark/sql/column.py", line 68, in <listcomp>
cols = [converter(c) for c in cols]
File "/databricks/spark/python/pyspark/sql/column.py", line 56, in _to_java_column
"function.".format(col, type(col)))
Could anyone let me know what is the mistake I am doing here ? How can I select all columns but line_number
from df1
abd column line_number
from df2
after the join ?
CodePudding user response:
Just drop it after join
df1 = df1.join(df2.alias('df2'),
on=(df1.document_nbr == df2.document_nbr) & (df1.line_id == df2.line_id),
how='left').drop(df1.line_number).show()
With Sample data
df1=spark.createDataFrame([(100 , 'QC' , 'Darzalex MM'),
(105 , 'XY' , 'INVOKANA'),
(107 , 'CZ' , 'Simponi RA'),
(117 , 'NM' , 'Guselkumab PSA'),
(118 , 'YC' , 'STELARA'),
(126 , 'RF' , 'INVOKANA')],
('document_nbr' , 'line_id' , 'line_number'))
df1.show()
df2 =spark.createDataFrame([(118 , 'YC' , 'STELARA'),
(126 , 'RF' , 'INVOKANA'),
(131 , 'VG' , 'STELARA'),
(135 , 'IJ' , 'Stelara CD')],
('document_nbr' , 'line_id' , 'line_number'))
df2.show()
df1 = df1.join(df2.alias('df2'),
on=(df1.document_nbr == df2.document_nbr) & (df1.line_id == df2.line_id),
how='left').drop(df1.line_number).show()
CodePudding user response:
You can use select after the join to select the columns you need using aliases
new_df= df1.alias('df1').join(df2.alias('df2'),
on=(df1.document_nbr == df2.document_nbr) & (df1.line_id == df2.line_id),
how='left').select('df1.document_nbr', 'df1.line_id', 'df2.line_number')
New_Df will now have only 3 columns, but you can add as many as you like.If you want to add all columns from a df use 'df1.*'. Hope this helps!