I am trying to add a running instance of MinIO to Airflow connections, I thought it should be as easy as this setup in the GUI (never mind the exposed credentials, this is a blocked of environment and will be changed afterwards):
Airflow as well as minio are running in docker containers, which both use the same docker network. Pressing the test button results in the following error:
'ClientError' error occurred while testing connection: An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation: The security token included in the request is invalid.
I am curious about what I am missing. The idea was to set up this connection and then use a bucket for
CodePudding user response:
I am also facing this error in Airflow 2.5.0. I've found workaround using boto3 library that already buit-in.
Firsty I created connection with parameters:
Connection Id: any label (Minio in my case)
Connection Type: Generic
Host: minio server ip and port
Login: Minio access key
Password: Minio secret key
And here's my code:
import boto3
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection('Minio')
s3 = boto3.resource('s3',
endpoint_url=conn.host,
aws_access_key_id=conn.login,
aws_secret_access_key=conn.password
)
s3client = s3.meta.client
#and then you can use boto3 methods for manipulating buckets and files
#for example:
bucket = s3.Bucket('test-bucket')
# Iterates through all the objects, doing the pagination for you. Each obj
# is an ObjectSummary, so it doesn't contain the body. You'll need to call
# get to get the whole body.
for obj in bucket.objects.all():
key = obj.key