I've written these Python methods in my custom Airflow operator to convert a dictionary to a dataframe then to StringIO object and upload it to S3 as a CSV file without saving locally.
def execute(self, context):
s3_hook = S3Hook(aws_conn_id=self.s3_conn_id)
retailer, d1 = context['task_instance'].xcom_pull(self.data_source)
self._upload_file(d1, retailer, s3_hook)
def _upload_to_s3(self, df, s3_hook):
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_hook.load_string(string_data=csv_buffer.getvalue(),
key=self.s3_key,
bucket_name=self.s3_bucket,
replace=True)
def _upload_file(self, d, retailer, s3_hook):
self.s3_key = f"S3_STAGING/{retailer}/{retailer}_summary.csv"
df = pd.DataFrame.from_dict(d, orient="index")
df.index.name = 'product_code'
self._upload_to_s3(df, s3_hook)
The DAG runs and uploads the file successfully, and the file looks normal when using S3 query on it. But when I try to query it in Snowflake:
select t.$1 as description,
t.$2 as parent_company
from @S3_STAGING/S3_STAGING/sample/sample_summary.csv as t
All columns are concatenated into one for some reasons. Is there any ways to fix this?
CodePudding user response:
Can you check if you defined a specific field_delimiter for the stage? To be sure, can you create a file format and use it:
create file format myformat type = 'csv';
select t.$1 as description, t.$2 as parent_company from @S3_STAGING/S3_STAGING/sample/sample_summary.csv (file_format => 'myformat') t;