Home > Back-end >  "Can not merge type" error when merging pandas dataframes
"Can not merge type" error when merging pandas dataframes

Time:06-22

import pandas as pd
import numpy as np

pdf1 = pd.DataFrame({
    'id': np.array([1, 2, 3, 4, 6, 7], dtype=int),
    'name': np.array(['a', 'b', 'c', 'd', None, 'e'], dtype=str)
    })
print(pdf1.dtypes)

pdf2 = pd.DataFrame({
    'id': np.array([1, 2, 3, 5, 6, 7], dtype=int),
    'name': np.array(['k', 'l', 'm', 'm', 'o', None], dtype=str)
    })
print(pdf2.dtypes)

res_pdf = pdf1.join(pdf2, on = ['id'], how = 'outer', lsuffix="_x", rsuffix="_y",)
spark.createDataFrame(res_pdf).show()

results in

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_8156/3197760477.py in <module>
     17 
     18 res_pdf = pdf1.join(pdf2, on = ['id'], how = 'outer', lsuffix="_x", rsuffix="_y",)
---> 19 spark.createDataFrame(res_pdf).show()

/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    671         if has_pandas and isinstance(data, pandas.DataFrame):
    672             # Create a DataFrame from pandas DataFrame.
--> 673             return super(SparkSession, self).createDataFrame(
    674                 data, schema, samplingRatio, verifySchema)
    675         return self._create_dataframe(data, schema, samplingRatio, verifySchema)

/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    298                     raise
    299         data = self._convert_from_pandas(data, schema, timezone)
--> 300         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    301 
    302     def _convert_from_pandas(self, pdf, schema, timezone):

/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    698             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    699         else:
--> 700             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    701         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    702         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    510 
    511         if schema is None or isinstance(schema, (list, tuple)):
--> 512             struct = self._inferSchemaFromList(data, names=schema)
    513             converter = _create_converter(struct)
    514             data = map(converter, data)

/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in _inferSchemaFromList(self, data, names)
    437         if not data:
    438             raise ValueError("can not infer schema from empty dataset")
--> 439         schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
    440         if _has_nulltype(schema):
    441             raise ValueError("Some of types cannot be determined after inferring")

/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py in _merge_type(a, b, name)
   1105     if isinstance(a, StructType):
   1106         nfs = dict((f.name, f.dataType) for f in b.fields)
-> 1107         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()),
   1108                                                   name=new_name(f.name)))
   1109                   for f in a.fields]

/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py in <listcomp>(.0)
   1105     if isinstance(a, StructType):
   1106         nfs = dict((f.name, f.dataType) for f in b.fields)
-> 1107         fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()),
   1108                                                   name=new_name(f.name)))
   1109                   for f in a.fields]

/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py in _merge_type(a, b, name)
   1100     elif type(a) is not type(b):
   1101         # TODO: type cast (such as int -> long)
-> 1102         raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
   1103 
   1104     # same type

TypeError: field name_y: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

I already specified the type explicitly, but Pandas seems to ignore it (and why it thinks that it's DoubleType?).
Any ideas what's the reason and how to fix this?

CodePudding user response:

You should define spark dataframe datatype. I think pyspark cannot find datatype because of null in column 'name'. Use StructField(name, dataType, nullable)

from pyspark.sql.types import *
schema = StructType([
  StructField('col1', IntegerType(), True),
  StructField('col2', StringType(), True)
])
sparkDf = spark.createDataFrame(res_df, schema = schema)

CodePudding user response:

When you pre-define the datatype like you have, then None gets interpreted as 'None'. So it's not a proper null value.

We can fix that by doing:

pdf1.replace('None', np.nan, inplace=True)
pdf2.replace('None', np.nan, inplace=True)

Then we can merge them:

df = pdf1.merge(pdf2, on='id',  how='outer')

It appears that pyspark doesn't like np.nan, since it identifies it as a DoubleType.

You can get around this by forcing np.nan to None

df.replace(np.nan, None, inplace=True)
  • Related