import pandas as pd
import numpy as np
pdf1 = pd.DataFrame({
'id': np.array([1, 2, 3, 4, 6, 7], dtype=int),
'name': np.array(['a', 'b', 'c', 'd', None, 'e'], dtype=str)
})
print(pdf1.dtypes)
pdf2 = pd.DataFrame({
'id': np.array([1, 2, 3, 5, 6, 7], dtype=int),
'name': np.array(['k', 'l', 'm', 'm', 'o', None], dtype=str)
})
print(pdf2.dtypes)
res_pdf = pdf1.join(pdf2, on = ['id'], how = 'outer', lsuffix="_x", rsuffix="_y",)
spark.createDataFrame(res_pdf).show()
results in
TypeError Traceback (most recent call last)
/tmp/ipykernel_8156/3197760477.py in <module>
17
18 res_pdf = pdf1.join(pdf2, on = ['id'], how = 'outer', lsuffix="_x", rsuffix="_y",)
---> 19 spark.createDataFrame(res_pdf).show()
/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
671 if has_pandas and isinstance(data, pandas.DataFrame):
672 # Create a DataFrame from pandas DataFrame.
--> 673 return super(SparkSession, self).createDataFrame(
674 data, schema, samplingRatio, verifySchema)
675 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/conversion.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
298 raise
299 data = self._convert_from_pandas(data, schema, timezone)
--> 300 return self._create_dataframe(data, schema, samplingRatio, verifySchema)
301
302 def _convert_from_pandas(self, pdf, schema, timezone):
/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
698 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
699 else:
--> 700 rdd, schema = self._createFromLocal(map(prepare, data), schema)
701 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
702 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in _createFromLocal(self, data, schema)
510
511 if schema is None or isinstance(schema, (list, tuple)):
--> 512 struct = self._inferSchemaFromList(data, names=schema)
513 converter = _create_converter(struct)
514 data = map(converter, data)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py in _inferSchemaFromList(self, data, names)
437 if not data:
438 raise ValueError("can not infer schema from empty dataset")
--> 439 schema = reduce(_merge_type, (_infer_schema(row, names) for row in data))
440 if _has_nulltype(schema):
441 raise ValueError("Some of types cannot be determined after inferring")
/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py in _merge_type(a, b, name)
1105 if isinstance(a, StructType):
1106 nfs = dict((f.name, f.dataType) for f in b.fields)
-> 1107 fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()),
1108 name=new_name(f.name)))
1109 for f in a.fields]
/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py in <listcomp>(.0)
1105 if isinstance(a, StructType):
1106 nfs = dict((f.name, f.dataType) for f in b.fields)
-> 1107 fields = [StructField(f.name, _merge_type(f.dataType, nfs.get(f.name, NullType()),
1108 name=new_name(f.name)))
1109 for f in a.fields]
/opt/spark/python/lib/pyspark.zip/pyspark/sql/types.py in _merge_type(a, b, name)
1100 elif type(a) is not type(b):
1101 # TODO: type cast (such as int -> long)
-> 1102 raise TypeError(new_msg("Can not merge type %s and %s" % (type(a), type(b))))
1103
1104 # same type
TypeError: field name_y: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
I already specified the type explicitly, but Pandas seems to ignore it (and why it thinks that it's DoubleType?).
Any ideas what's the reason and how to fix this?
CodePudding user response:
You should define spark dataframe datatype. I think pyspark cannot find datatype because of null in column 'name'. Use StructField(name, dataType, nullable)
from pyspark.sql.types import *
schema = StructType([
StructField('col1', IntegerType(), True),
StructField('col2', StringType(), True)
])
sparkDf = spark.createDataFrame(res_df, schema = schema)
CodePudding user response:
When you pre-define the datatype like you have, then None
gets interpreted as 'None'
. So it's not a proper null value.
We can fix that by doing:
pdf1.replace('None', np.nan, inplace=True)
pdf2.replace('None', np.nan, inplace=True)
Then we can merge them:
df = pdf1.merge(pdf2, on='id', how='outer')
It appears that pyspark doesn't like np.nan
, since it identifies it as a DoubleType
.
You can get around this by forcing np.nan
to None
df.replace(np.nan, None, inplace=True)