I am trying to insert some data into a table using a simple Python operator, not the BigQuery operator, but I am unsure how to implement this. I am trying to implement this in the form of an Airflow DAG.
I have written a simple DAG, and I have managed to use the following to insert the data from a GCS Bucket to BigQuery, but I am wanting to do this using a Python operator instead, not BigQuery:
load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket='cloud-samples-data',
source_objects=['bigquery/us-states/us-states.csv'],
destination_project_dataset_table='airflow_test.gcs_to_bq_table',
schema_fields=[
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag)
I am wanting to achieve the above using a simple Python operator instead of BigQuery.
CodePudding user response:
You can use BigQuery
Python
client in a PythonOperator
to insert GCS
files to BigQuery
, example :
PythonOperator(
task_id="gcs_to_bq",
op_kwargs={
'dataset': 'dataset',
'table': 'table'
},
python_callable=load_gcs_files_to_bq
)
def load_gcs_files_to_bq(dataset, table):
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
table_id = f"your-project.{dataset}.{table}"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))