df1
has a lot of data, I want to filter that has id
that avaliable in df2
. Here's what I did
df1.filter(col('id').isin(df2.select('id')))
Here's the error message,
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/column.py in isin(self, *cols)
441 if len(cols) == 1 and isinstance(cols[0], (list, set)):
442 cols = cols[0]
--> 443 cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
444 sc = SparkContext._active_spark_context
445 jc = getattr(self._jc, "isin")(_to_seq(sc, cols))
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/column.py in <listcomp>(.0)
441 if len(cols) == 1 and isinstance(cols[0], (list, set)):
442 cols = cols[0]
--> 443 cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
444 sc = SparkContext._active_spark_context
445 jc = getattr(self._jc, "isin")(_to_seq(sc, cols))
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/column.py in _create_column_from_literal(literal)
33 def _create_column_from_literal(literal):
34 sc = SparkContext._active_spark_context
---> 35 return sc._jvm.functions.lit(literal)
36
37
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1246
1247 def __call__(self, *args):
-> 1248 args_command, temp_args = self._build_args(*args)
1249
1250 command = proto.CALL_COMMAND_NAME \
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _build_args(self, *args)
1216
1217 args_command = "".join(
-> 1218 [get_command_part(arg, self.pool) for arg in new_args])
1219
1220 return args_command, temp_args
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in <listcomp>(.0)
1216
1217 args_command = "".join(
-> 1218 [get_command_part(arg, self.pool) for arg in new_args])
1219
1220 return args_command, temp_args
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
296 command_part = ";" interface
297 else:
--> 298 command_part = REFERENCE_TYPE parameter._get_object_id()
299
300 command_part = "\n"
/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
1302 if name not in self.columns:
1303 raise AttributeError(
-> 1304 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
1305 jc = self._jdf.apply(name)
1306 return Column(jc)
AttributeError: 'DataFrame' object has no attribute '_get_object_id'
CodePudding user response:
trying this: df1.join(df2.select('id'), ['id'], 'inner')
CodePudding user response:
select()
returns a DataFrame:
print(type(df2.select('id')))
# <class 'pyspark.sql.dataframe.DataFrame'>
Hence you need to convert this into a list and provide to isin()
:
df2_id_list = df2.select('id').rdd.flatMap(lambda x: x).collect()
df1 = df1.filter(F.col('id').isin(df2_id_list))
But collect()
is not advisable as it tries to move all the selected data to the driver and the driver may go out of memory. Therefore it's better to use something like join
with leftsemi
. leftsemi
helps avoid duplicate ids in df2 from being created in the final result of df1.
df1 = df1.join(df2, 'id', 'leftsemi')