In this document, Apache Beam suggests the deadletter pattern when writing to BigQuery. This pattern allows you to fetch rows that failed to be written from the transform output with the 'FailedRows'
tag.
However, when I try to use it:
WriteToBigQuery(
table=self.bigquery_table_name,
schema={"fields": self.bigquery_table_schema},
method=WriteToBigQuery.Method.FILE_LOADS,
temp_file_format=FileFormat.AVRO,
)
A schema mismatch in one of my elements causes the following exception:
Error message from worker: Traceback (most recent call last):
File
"/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630,
in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
in fastavro._write.write_union ValueError: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy During handling of the above exception, another exception occurred: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1198,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 718,
in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 841,
in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1334,
in apache_beam.runners.common._OutputProcessor.process_outputs File "/my_code/apache_beam/io/gcp/bigquery_file_loads.py", line 258,
in process writer.write(row) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1635,
in write ex, self._avro_writer.schema, row)).with_traceback(tb) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630,
in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
in fastavro._write.write_union ValueError: Error writing row to Avro: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy Schema: ...
From what I gather, the schema mismatch causes fastavro._write.Writer.write
to fail and throw an exception. Instead, I would like WriteToBigQuery
to apply the deadletter behavior and return my malformed rows as FailedRows
tagged output. Is there a way to achieve this?
Thanks
CodePudding user response:
Let's step back slightly on aims and outcomes desired.
Why is "FILE_LOADS" required as a bigquery write method?
Are you also aware of the BigQuery Storage Write API: https://cloud.google.com/bigquery/docs/write-api
It looks like the java sdk supports the BQ Write API, but not currently the python sdk. I believe using the write API would connect over gRPC to write into BigQuery, rather than needing to serialize to avro to then call the [ legacy ] batch load process?
Perhaps take a look and see if that helps -- schemas are important, but it seems AVRO is irrelevant to your aims and in there just because of the code you are calling?
CodePudding user response:
I am only used to thinking about 'deadletter' pattern as it relates to streaming. That's probably just a terminology nit-pick, as your message makes pretty clear your intentions.