Learning Apache Beam with the dataframe API at the moment and coming across some unexpected behavior that I was hoping an expert could explain to me.
Here's the simplest version of my issue I could drill it down to (in the real version the dataframe transform is something more complex):
class LocationRow(NamedTuple):
h3_index: str
with beam.Pipeline(options=beam_options) as pipeline:
(pipeline
| ReadFromBigQuery(
query=f'SELECT h3_index FROM {H3_INDEX_TABLE} LIMIT 100', use_standard_sql=True)
.with_output_types(LocationRow)
| DataframeTransform(lambda df: df)
| WriteToBigQuery(
schema='h3_index:STRING',
table=OUTPUT_TABLE))
Running this with DirectRunner
(or DataflowRunner
) crashes with the following:
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://analysis-dataflow-temp/temp/bq_load/0163282c2bbc47ba8ec368b158aefe2e/core-modules-development.analysis.fake_grid_power_price/5a1fc783-dcdc-44bd-9855-faea6151574f'
So, I looked into that file and it's just a json list per line:
$ cat 5a1fc783-dcdc-44bd-9855-faea6151574f
["8800459aedfffff"]
["88004536c5fffff"]
["8800418237fffff"]
["8800422b39fffff"]
["8800432451fffff"]
["88004175d7fffff"]
...
I figured out that BigQuery is expecting an object per line (like {"h3_index": "88004175d7fffff"}
), and if I remove the DataframeTransform
in the pipeline it works. So I tried using print to figure out what's happening, and changed the pipeline to this:
with beam.Pipeline(options=beam_options) as pipeline:
(pipeline
| ReadFromBigQuery(
query=f'SELECT h3_index FROM {H3_INDEX_TABLE} LIMIT 5', use_standard_sql=True)
.with_output_types(LocationRow)
| DataframeTransform(lambda df: df)
| beam.Map(print)
Which gives this output:
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806b00819fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806ab98d3fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806accd45fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806ac60a7fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806acb409fffff')
If I remove the DataframeTransform
and keep the Map(print)
I get this instead:
{'h3_index': '88012db281fffff'}
{'h3_index': '88012ea527fffff'}
{'h3_index': '88012e38c5fffff'}
{'h3_index': '88012e2135fffff'}
{'h3_index': '88012ea949fffff'}
So it looks like the DataframeTransform
is returning collections of NamedTuples (or similar) rather than dictionaries, and the WriteToBigQuery
fails with these tuples. I can fix it by adding a Map
after the DataframeTransform
to change this explicitly:
with beam.Pipeline(options=beam_options) as pipeline:
(pipeline
| ReadFromBigQuery(
query=f'SELECT h3_index FROM {H3_INDEX_TABLE} LIMIT 100', use_standard_sql=True)
.with_output_types(LocationRow)
| DataframeTransform(lambda df: df)
| beam.Map(lambda row: {'h3_index': row.h3_index})
| WriteToBigQuery(
schema='h3_index:STRING',
table=OUTPUT_TABLE))
But this feels unnecessary, and I don't really understand what's happening here. What's the difference between a collection of tuples and one of dictionaries? Hoping a Beam expert can shed some light on this!
CodePudding user response:
The DataframeTransform allows using the Dataframe
api and returns a Beam
schema.
Unfortunately if you need to manipulate a Dict
after this step, you have to add a transformation after in order to map the Beam
schema to a Dict
as you shown in your question.
It can be interesting sometimes to use the Dataframe
api and its powerful syntaxe, example :
from apache_beam.dataframe.transforms import DataframeTransform
with beam.Pipeline() as p:
...
| beam.Select(DOLocationID=lambda line: int(..),
passenger_count=lambda line: int(..))
| DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
| beam.Map(lambda row: f"{row.DOLocationID},{row.passenger_count}")
But to be able to interact with many native Beam
IOs like WriteToBigquery
, you have to transform your result schema to another structure (Dict or others).
If you don't want to add the transformation from Beam
schema to Dict
before to write data to BigQuery
, instead of using DataframeTransform
, you can use an usual ParDo
/DoFn
or beam.Map
with a Python
method containing your transformations business logic and returning the result as Dict
.