Home > Blockchain >  How to filter using isin from another pyspark dataframe
How to filter using isin from another pyspark dataframe

Time:07-21

df1 has a lot of data, I want to filter that has id that avaliable in df2. Here's what I did

df1.filter(col('id').isin(df2.select('id')))

Here's the error message,

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/column.py in isin(self, *cols)
    441         if len(cols) == 1 and isinstance(cols[0], (list, set)):
    442             cols = cols[0]
--> 443         cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
    444         sc = SparkContext._active_spark_context
    445         jc = getattr(self._jc, "isin")(_to_seq(sc, cols))

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/column.py in <listcomp>(.0)
    441         if len(cols) == 1 and isinstance(cols[0], (list, set)):
    442             cols = cols[0]
--> 443         cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
    444         sc = SparkContext._active_spark_context
    445         jc = getattr(self._jc, "isin")(_to_seq(sc, cols))

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/column.py in _create_column_from_literal(literal)
     33 def _create_column_from_literal(literal):
     34     sc = SparkContext._active_spark_context
---> 35     return sc._jvm.functions.lit(literal)
     36 
     37 

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1246 
   1247     def __call__(self, *args):
-> 1248         args_command, temp_args = self._build_args(*args)
   1249 
   1250         command = proto.CALL_COMMAND_NAME  \

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in _build_args(self, *args)
   1216 
   1217         args_command = "".join(
-> 1218             [get_command_part(arg, self.pool) for arg in new_args])
   1219 
   1220         return args_command, temp_args

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in <listcomp>(.0)
   1216 
   1217         args_command = "".join(
-> 1218             [get_command_part(arg, self.pool) for arg in new_args])
   1219 
   1220         return args_command, temp_args

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_command_part(parameter, python_proxy_pool)
    296             command_part  = ";"   interface
    297     else:
--> 298         command_part = REFERENCE_TYPE   parameter._get_object_id()
    299 
    300     command_part  = "\n"

/opt/cloudera/parcels/CDH-7.1.3-1.cdh7.1.3.p0.4992530/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
   1302         if name not in self.columns:
   1303             raise AttributeError(
-> 1304                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
   1305         jc = self._jdf.apply(name)
   1306         return Column(jc)

AttributeError: 'DataFrame' object has no attribute '_get_object_id'

CodePudding user response:

trying this: df1.join(df2.select('id'), ['id'], 'inner')

CodePudding user response:

select() returns a DataFrame:

print(type(df2.select('id')))

# <class 'pyspark.sql.dataframe.DataFrame'>

Hence you need to convert this into a list and provide to isin():

df2_id_list = df2.select('id').rdd.flatMap(lambda x: x).collect()
df1 = df1.filter(F.col('id').isin(df2_id_list))

But collect() is not advisable as it tries to move all the selected data to the driver and the driver may go out of memory. Therefore it's better to use something like join with leftsemi. leftsemi helps avoid duplicate ids in df2 from being created in the final result of df1.

df1 = df1.join(df2, 'id', 'leftsemi')
  • Related