I am not able to assign a series as a new column to a koalas dataframe. Below is the codebase that I am using:
from databricks import koalas
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))
output:
AnalysisException Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
700 type_pprinters=self.type_printers,
701 deferred_pprinters=self.deferred_printers)
--> 702 printer.pretty(obj)
703 printer.flush()
704 return stream.getvalue()
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in pretty(self, obj)
392 if cls is not object \
393 and callable(cls.__dict__.get('__repr__')):
--> 394 return _repr_pprint(obj, self, cycle)
395
396 return _default_pprint(obj, self, cycle)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
698 """A pprint that just redirects to the normal repr function."""
699 # Find newlines and replace them with p.break_()
--> 700 output = repr(obj)
701 lines = output.splitlines()
702 with p.group():
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in __repr__(self)
11661 return self._to_internal_pandas().to_string()
11662
> 11663 pdf = self._get_or_create_repr_pandas_cache(max_display_count)
11664 pdf_length = len(pdf)
11665 pdf = pdf.iloc[:max_display_count]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
11652 if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
11653 object.__setattr__(
> 11654 self, "_repr_pandas_cache", {n: self.head(n 1)._to_internal_pandas()}
11655 )
11656 return self._repr_pandas_cache[n]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
5748 return DataFrame(self._internal.with_filter(F.lit(False)))
5749 else:
-> 5750 sdf = self._internal.resolved_copy.spark_frame
5751 if get_option("compute.ordered_head"):
5752 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
576 def wrapped_lazy_property(self):
577 if not hasattr(self, attr_name):
--> 578 setattr(self, attr_name, fn(self))
579 return getattr(self, attr_name)
580
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
1066 def resolved_copy(self) -> "InternalFrame":
1067 """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068 sdf = self.spark_frame.select(self.spark_columns list(HIDDEN_COLUMNS))
1069 return self.copy(
1070 spark_frame=sdf,
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
1683 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
1684 """
-> 1685 jdf = self._jdf.select(self._jcols(*cols))
1686 return DataFrame(jdf, self.sql_ctx)
1687
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
343 method = get_real_method(obj, self.print_method)
344 if method is not None:
--> 345 return method()
346 return None
347 else:
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _repr_html_(self)
11684 return self._to_internal_pandas().to_html(notebook=True, bold_rows=bold_rows)
11685
> 11686 pdf = self._get_or_create_repr_pandas_cache(max_display_count)
11687 pdf_length = len(pdf)
11688 pdf = pdf.iloc[:max_display_count]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
11652 if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
11653 object.__setattr__(
> 11654 self, "_repr_pandas_cache", {n: self.head(n 1)._to_internal_pandas()}
11655 )
11656 return self._repr_pandas_cache[n]
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
5748 return DataFrame(self._internal.with_filter(F.lit(False)))
5749 else:
-> 5750 sdf = self._internal.resolved_copy.spark_frame
5751 if get_option("compute.ordered_head"):
5752 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
576 def wrapped_lazy_property(self):
577 if not hasattr(self, attr_name):
--> 578 setattr(self, attr_name, fn(self))
579 return getattr(self, attr_name)
580
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
1066 def resolved_copy(self) -> "InternalFrame":
1067 """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068 sdf = self.spark_frame.select(self.spark_columns list(HIDDEN_COLUMNS))
1069 return self.copy(
1070 spark_frame=sdf,
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
1683 [Row(name='Alice', age=12), Row(name='Bob', age=15)]
1684 """
-> 1685 jdf = self._jdf.select(self._jcols(*cols))
1686 return DataFrame(jdf, self.sql_ctx)
1687
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
1307
1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
1310 answer, self.gateway_client, self.target_id, self.name)
1311
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
115 # Hide where the exception came from that shows a non-Pythonic
116 # JVM exception message.
--> 117 raise converted from None
118 else:
119 raise
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
Can you help me understand what is going wrong with my approach and how to assign a new column to a koalas datadrame?
CodePudding user response:
I honestly don't know why you get that error with assign
, but one way to add a new column to a koalas.DataFrame
is to use the standard assignment method ['']
like below.
It is important to change the option compute.ops_on_diff_frames
to allow operations on different Series/DataFrames.
import databricks.koalas as ks
ks.set_option('compute.ops_on_diff_frames', True)
dft = ks.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft['c'] = koalas.Series([1,2,3])
dft
# a b c
# 0 1 3 1
# 1 2 4 2
# 2 3 5 3
CodePudding user response:
Unfortunately, you can only use expression over existing columns of your dataframe in assign
method.
Explanation
The important part in your error stack is the spark execution plan:
AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
- Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
- LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false
In spark execution plan, Project
can be translated as SQL's SELECT
. And you can see execution plan fails at the second Project
(you read spark execution plan from bottom to top) because it couldn't find column 0#991184L
(that is the serie you want to add to your dft
dataframe) among columns present in your dft
dataframe that are __index_level_0__#991164L
, a#991165L
, b#991166L
, __natural_order__#991170L
Indeed, column 0#991184L
comes from a serie you've created out of the blue, not from a serie derivated from your dft
dataframe. For Spark, it means that this column comes from another dataframe and so you obviously can't retrieve it from your dft
dataframe with a SELECT
, precisely what Spark is trying to do.
To link pandas and Spark APIs, Spark equivalent of assign
would be withColumn
Spark dataframe method whose documentation states:
The column expression must be an expression over this DataFrame; attempting to add a column from some other DataFrame will raise an error.
So assign
will work for the following cases:
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
# Type of dft['a'] and dft['b'] is Serie
dft.assign(c=dft['a']))
dft.assign(d=dft['a']*2))
dft.assign(e=dft['a']*dft['b']))
but not in the following cases:
dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))
dft2=pd.DataFrame({'d': [1, 2, 3]})
# Type of dft2['d'] is Serie
dft.assign(d=dft2['d'])
Workaround
Here the workaround is to do as explained in ric-s' answer and assign column using dft['c'] = koalas.Series([1,2,3])
Here it works because in this case, Spark will join the two dataframes instead of merely selecting columns from first dataframe. As join, here hidden by koalas API, can be very expensive operation in Spark, you have a guardrail that you need to override by setting compute.ops_on_diff_frames
to True
Setting compute.ops_on_diff_frames
to true just tell koalas "I acknowledge that this operation is a join and may lead to poor performance". You can actually reset this option to its previous value after performing your operation, with koalas.reset_option('compute.ops_on_diff_frames')