Home > front end >  Unexpected Apache Beam (Dataframe API) behavior gives tuples instead of dictionaries, breaking BigQu
Unexpected Apache Beam (Dataframe API) behavior gives tuples instead of dictionaries, breaking BigQu

Time:01-30

Learning Apache Beam with the dataframe API at the moment and coming across some unexpected behavior that I was hoping an expert could explain to me.

Here's the simplest version of my issue I could drill it down to (in the real version the dataframe transform is something more complex):

class LocationRow(NamedTuple):
  h3_index: str

with beam.Pipeline(options=beam_options) as pipeline:
  (pipeline 
    | ReadFromBigQuery(
        query=f'SELECT h3_index FROM {H3_INDEX_TABLE} LIMIT 100', use_standard_sql=True)
        .with_output_types(LocationRow)
    | DataframeTransform(lambda df: df)
    | WriteToBigQuery(
        schema='h3_index:STRING',
        table=OUTPUT_TABLE))

Running this with DirectRunner (or DataflowRunner) crashes with the following:

message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://analysis-dataflow-temp/temp/bq_load/0163282c2bbc47ba8ec368b158aefe2e/core-modules-development.analysis.fake_grid_power_price/5a1fc783-dcdc-44bd-9855-faea6151574f'

So, I looked into that file and it's just a json list per line:

$ cat 5a1fc783-dcdc-44bd-9855-faea6151574f
["8800459aedfffff"]
["88004536c5fffff"]
["8800418237fffff"]
["8800422b39fffff"]
["8800432451fffff"]
["88004175d7fffff"]
...

I figured out that BigQuery is expecting an object per line (like {"h3_index": "88004175d7fffff"}), and if I remove the DataframeTransform in the pipeline it works. So I tried using print to figure out what's happening, and changed the pipeline to this:

with beam.Pipeline(options=beam_options) as pipeline:
  (pipeline 
    | ReadFromBigQuery(
        query=f'SELECT h3_index FROM {H3_INDEX_TABLE} LIMIT 5', use_standard_sql=True)
        .with_output_types(LocationRow)
    | DataframeTransform(lambda df: df)
    | beam.Map(print)

Which gives this output:

BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806b00819fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806ab98d3fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806accd45fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806ac60a7fffff')
BeamSchema_574444a4_ae3e_4bb2_9cca_4867670ef2bb(h3_index='8806acb409fffff')

If I remove the DataframeTransform and keep the Map(print) I get this instead:

{'h3_index': '88012db281fffff'}
{'h3_index': '88012ea527fffff'}
{'h3_index': '88012e38c5fffff'}
{'h3_index': '88012e2135fffff'}
{'h3_index': '88012ea949fffff'}

So it looks like the DataframeTransform is returning collections of NamedTuples (or similar) rather than dictionaries, and the WriteToBigQuery fails with these tuples. I can fix it by adding a Map after the DataframeTransform to change this explicitly:

with beam.Pipeline(options=beam_options) as pipeline:
  (pipeline 
    | ReadFromBigQuery(
        query=f'SELECT h3_index FROM {H3_INDEX_TABLE} LIMIT 100', use_standard_sql=True)
        .with_output_types(LocationRow)
    | DataframeTransform(lambda df: df)
    | beam.Map(lambda row: {'h3_index': row.h3_index})
    | WriteToBigQuery(
        schema='h3_index:STRING',
        table=OUTPUT_TABLE))

But this feels unnecessary, and I don't really understand what's happening here. What's the difference between a collection of tuples and one of dictionaries? Hoping a Beam expert can shed some light on this!

CodePudding user response:

The DataframeTransform allows using the Dataframe api and returns a Beam schema.

Unfortunately if you need to manipulate a Dict after this step, you have to add a transformation after in order to map the Beam schema to a Dict as you shown in your question.

It can be interesting sometimes to use the Dataframe api and its powerful syntaxe, example :

from apache_beam.dataframe.transforms import DataframeTransform

with beam.Pipeline() as p:
  ...
  | beam.Select(DOLocationID=lambda line: int(..),
                passenger_count=lambda line: int(..))
  | DataframeTransform(lambda df: df.groupby('DOLocationID').sum())
  | beam.Map(lambda row: f"{row.DOLocationID},{row.passenger_count}")

But to be able to interact with many native Beam IOs like WriteToBigquery, you have to transform your result schema to another structure (Dict or others).

If you don't want to add the transformation from Beam schema to Dict before to write data to BigQuery, instead of using DataframeTransform, you can use an usual ParDo/DoFn or beam.Map with a Python method containing your transformations business logic and returning the result as Dict.

  • Related