Home > Software engineering >  cannot assign a koalas series as a new column in koalas
cannot assign a koalas series as a new column in koalas

Time:12-17

I am not able to assign a series as a new column to a koalas dataframe. Below is the codebase that I am using:

from databricks import koalas

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft.assign(c=koalas.Series([1,2,3]))

output:

AnalysisException                         Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in pretty(self, obj)
    392                         if cls is not object \
    393                                 and callable(cls.__dict__.get('__repr__')):
--> 394                             return _repr_pprint(obj, self, cycle)
    395 
    396             return _default_pprint(obj, self, cycle)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    698     """A pprint that just redirects to the normal repr function."""
    699     # Find newlines and replace them with p.break_()
--> 700     output = repr(obj)
    701     lines = output.splitlines()
    702     with p.group():

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in __repr__(self)
  11661             return self._to_internal_pandas().to_string()
  11662 
> 11663         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  11664         pdf_length = len(pdf)
  11665         pdf = pdf.iloc[:max_display_count]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  11652         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  11653             object.__setattr__(
> 11654                 self, "_repr_pandas_cache", {n: self.head(n   1)._to_internal_pandas()}
  11655             )
  11656         return self._repr_pandas_cache[n]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
   5748             return DataFrame(self._internal.with_filter(F.lit(False)))
   5749         else:
-> 5750             sdf = self._internal.resolved_copy.spark_frame
   5751             if get_option("compute.ordered_head"):
   5752                 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    576     def wrapped_lazy_property(self):
    577         if not hasattr(self, attr_name):
--> 578             setattr(self, attr_name, fn(self))
    579         return getattr(self, attr_name)
    580 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
   1066     def resolved_copy(self) -> "InternalFrame":
   1067         """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068         sdf = self.spark_frame.select(self.spark_columns   list(HIDDEN_COLUMNS))
   1069         return self.copy(
   1070             spark_frame=sdf,

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
   1683         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   1684         """
-> 1685         jdf = self._jdf.select(self._jcols(*cols))
   1686         return DataFrame(jdf, self.sql_ctx)
   1687 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
 - Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
    - LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false


---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
~/miniconda3/envs/pyspark/lib/python3.9/site-packages/IPython/core/formatters.py in __call__(self, obj)
    343             method = get_real_method(obj, self.print_method)
    344             if method is not None:
--> 345                 return method()
    346             return None
    347         else:

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _repr_html_(self)
  11684             return self._to_internal_pandas().to_html(notebook=True, bold_rows=bold_rows)
  11685 
> 11686         pdf = self._get_or_create_repr_pandas_cache(max_display_count)
  11687         pdf_length = len(pdf)
  11688         pdf = pdf.iloc[:max_display_count]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in _get_or_create_repr_pandas_cache(self, n)
  11652         if not hasattr(self, "_repr_pandas_cache") or n not in self._repr_pandas_cache:
  11653             object.__setattr__(
> 11654                 self, "_repr_pandas_cache", {n: self.head(n   1)._to_internal_pandas()}
  11655             )
  11656         return self._repr_pandas_cache[n]

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/frame.py in head(self, n)
   5748             return DataFrame(self._internal.with_filter(F.lit(False)))
   5749         else:
-> 5750             sdf = self._internal.resolved_copy.spark_frame
   5751             if get_option("compute.ordered_head"):
   5752                 sdf = sdf.orderBy(NATURAL_ORDER_COLUMN_NAME)

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/utils.py in wrapped_lazy_property(self)
    576     def wrapped_lazy_property(self):
    577         if not hasattr(self, attr_name):
--> 578             setattr(self, attr_name, fn(self))
    579         return getattr(self, attr_name)
    580 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/databricks/koalas/internal.py in resolved_copy(self)
   1066     def resolved_copy(self) -> "InternalFrame":
   1067         """ Copy the immutable InternalFrame with the updates resolved. """
-> 1068         sdf = self.spark_frame.select(self.spark_columns   list(HIDDEN_COLUMNS))
   1069         return self.copy(
   1070             spark_frame=sdf,

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/dataframe.py in select(self, *cols)
   1683         [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   1684         """
-> 1685         jdf = self._jdf.select(self._jcols(*cols))
   1686         return DataFrame(jdf, self.sql_ctx)
   1687 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    115                 # Hide where the exception came from that shows a non-Pythonic
    116                 # JVM exception message.
--> 117                 raise converted from None
    118             else:
    119                 raise

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
 - Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
    - LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false

Can you help me understand what is going wrong with my approach and how to assign a new column to a koalas datadrame?

CodePudding user response:

I honestly don't know why you get that error with assign, but one way to add a new column to a koalas.DataFrame is to use the standard assignment method [''] like below.
It is important to change the option compute.ops_on_diff_frames to allow operations on different Series/DataFrames.

import databricks.koalas as ks
ks.set_option('compute.ops_on_diff_frames', True)

dft = ks.DataFrame({'a':[1,2,3],'b':[3,4,5]})
dft['c'] = koalas.Series([1,2,3])

dft
#    a  b  c
# 0  1  3  1
# 1  2  4  2
# 2  3  5  3

CodePudding user response:

Unfortunately, you can only use expression over existing columns of your dataframe in assign method.

Explanation

The important part in your error stack is the spark execution plan:

AnalysisException: Resolved attribute(s) 0#991184L missing from __index_level_0__#991164L,a#991165L,b#991166L,__natural_order__#991170L in operator !Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L].;
!Project [__index_level_0__#991164L, a#991165L, b#991166L, 0#991184L AS c#991191L, __natural_order__#991170L]
 - Project [__index_level_0__#991164L, a#991165L, b#991166L, monotonically_increasing_id() AS __natural_order__#991170L]
    - LogicalRDD [__index_level_0__#991164L, a#991165L, b#991166L], false

In spark execution plan, Project can be translated as SQL's SELECT. And you can see execution plan fails at the second Project (you read spark execution plan from bottom to top) because it couldn't find column 0#991184L (that is the serie you want to add to your dft dataframe) among columns present in your dft dataframe that are __index_level_0__#991164L, a#991165L, b#991166L, __natural_order__#991170L

Indeed, column 0#991184L comes from a serie you've created out of the blue, not from a serie derivated from your dft dataframe. For Spark, it means that this column comes from another dataframe and so you obviously can't retrieve it from your dft dataframe with a SELECT, precisely what Spark is trying to do.

To link pandas and Spark APIs, Spark equivalent of assign would be withColumn Spark dataframe method whose documentation states:

The column expression must be an expression over this DataFrame; attempting to add a column from some other DataFrame will raise an error.

So assign will work for the following cases:

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})
# Type of dft['a'] and dft['b'] is Serie
dft.assign(c=dft['a']))
dft.assign(d=dft['a']*2))
dft.assign(e=dft['a']*dft['b']))

but not in the following cases:

dft=koalas.DataFrame({'a':[1,2,3],'b':[3,4,5]})

dft.assign(c=koalas.Series([1,2,3]))

dft2=pd.DataFrame({'d': [1, 2, 3]})
# Type of dft2['d'] is Serie
dft.assign(d=dft2['d'])

Workaround

Here the workaround is to do as explained in ric-s' answer and assign column using dft['c'] = koalas.Series([1,2,3])

Here it works because in this case, Spark will join the two dataframes instead of merely selecting columns from first dataframe. As join, here hidden by koalas API, can be very expensive operation in Spark, you have a guardrail that you need to override by setting compute.ops_on_diff_frames to True

Setting compute.ops_on_diff_frames to true just tell koalas "I acknowledge that this operation is a join and may lead to poor performance". You can actually reset this option to its previous value after performing your operation, with koalas.reset_option('compute.ops_on_diff_frames')

  • Related