Home > Mobile >  Dynamic arrayjob size AwsBatchOperator
Dynamic arrayjob size AwsBatchOperator

Time:10-01

I have a compute job I wish to scale dynamically according to charge.

Since the compute is done on AWS batch, I want to use the array_size parameter, here is how I do it with the AwsBatchOperator in airflow :

task_batch = AwsBatchOperator(
        job_name=job_name,
        job_definition=job_def,
        job_queue=job_queue,
        array_properties={'size': " {{ ti.xcom_pull(task_ids='compute_arraysize') | int }}" },
        task_id="task_to_scale",
    )

However, as far as I understood the documentation, the array_properties parameter isn't "templated" (not sure why though), so my template is not rendered and I have this error :

Invalid type for parameter arrayProperties.size, value:  {{ ti.xcom_pull(task_ids='compute_arraysize') | int }}, type: <class 'str'>, valid types: <class 'int'>

How can I fix this and decide the size dynamically ?

CodePudding user response:

The easy local fix will be to add "array_properties" to the template_fields sequence on line 104 of the AwsBatchOperator. I will put in a pull request to implement it in the Airflow code, but it will take some time for that to make its way to you.

[EDIT] The PR to fix it permanently is here if you want to follow its progress. It should be in the next Amazon Provider Package release, whenever that gets published.

CodePudding user response:

As explained by feruzzi PR will solve the issue and will be available in

apache-airflow-providers-amazon>=6.0.1

However you can still solve your issue in your current version by adding the needed parameter to template fields:

class MyBatchOperator(BatchOperator):

    template_fields = (
        "array_properties",
    )   BatchOperator.template_fields

Then you can do:

task_batch = MyBatchOperator(
        job_name=job_name,
        job_definition=job_def,
        job_queue=job_queue,
        array_properties={'size': " {{ ti.xcom_pull(task_ids='compute_arraysize') | int }}" },
        task_id="task_to_scale",
    )
  • Related