Home > Enterprise >  Data Factory - pass multiple variables to foreach activity
Data Factory - pass multiple variables to foreach activity

Time:09-01

I'm creating a pipeline on ADF and basically I've 2 variables and 1 object.

The variables are arrays:

  1. CountryPartition: https://i.stack.imgur.com/7k1hm.png
  2. YearPartition: https://i.stack.imgur.com/r4B2o.png

And I've a lookup activity that I'm querying a azure storage table to get more parameters: https://i.stack.imgur.com/a5Csv.png

I want to append dynamically the CountryPartition, YearPartition and databricksPath and pass to foreach activity. For that example and following the parameters it will run 16 times. 2 different databricks paths for 4 countries and 2 years: https://i.stack.imgur.com/jqlLs.png

I tried to create this solution @union(activity('Config Table Synapse Fact Query').output.value, array(concat('{"countryPartition" :',variables('CountryPartition'),'}')), array(concat('{"yearPartition" :',variables('YearsPartition'),'}')))

and I obtained this result: https://i.stack.imgur.com/ypytv.png

but when I tried to pass this result to Foreach activity it got a error: https://i.stack.imgur.com/w1PIn.png

Foreach activity: https://i.stack.imgur.com/exRka.png

Details The expression 'item().countryPartition' cannot be evaluated because property 'countryPartition' doesn't exist, available properties are 'PartitionKey, RowKey, Timestamp, ID, databricksPath, databricksTable, onOff, synapseTable, tableType'.

It seems that it only considers the values ​​in the first json and not the other variables (countryPartition and yearPartition)

Can anyone please help me in achieving this

Thank you!

CodePudding user response:

The error is because, you applied @union and combined strings to an array of objects (concat('{"countryPartition" :',variables('CountryPartition'),'}') is a string).

  • To overcome the error you can use the following (convert the above string to json):
@union(activity('Config Table Synapse Fact Query').output.value, array(json(concat('{"countryPartition" :',variables('CountryPartition'),'}'))), array(json(concat('{"yearPartition" :',variables('YearsPartition'),'}'))))
  • But this does not help to get the 16 combinations of databricks path, country partition and year partition. You would have to use multiple loops to get all the required combinations (as in image 4).
  • Instead, you can utilize dataflows to get the required result. You can follow the procedure given below.
  • We already have 2 array variables for countryPartition and yearPartition (I took these as array parameters for demo). Now I used a for each activity along with append variable activity (which is inside for each.) to get an array of databricks paths.

enter image description here

  • Now, I need to pass paths, countryPartition, yearPartition arrays to dataflow, where I obtain the combination of each (16 combinations as in image 4).

enter image description here

  • Inside my dataflow, I have taken a sample source file with just one row (this file is used only to achieve what we want. Data in this file does not affect the process).

  • Create 3 sources with this sample file. Apply derived column transformation on each of the source. For source 1, I added country column with value as unfold($country). Similarly for source 2 (path column with value asunfold($path) ) and source 3 (year column with value asunfold($year)).

enter image description here

  • Use select on each of the above only to select the required column (country, path and year respectively).

enter image description here

  • Now we have 3 tables each containing one required column. You can apply join transformation (custom cross join with condition as true()). Use cross join on tables containing country and path. Then apply cross join on the result of above and year table.

enter image description here

  • Select sink as your storage account (in writeToCSV). The required result will be written to your sink as a csv file. The following is the debug output: enter image description here

  • The following is an image of resultant CSV file with required records:

enter image description here

Now you can use lookup to read this req_rows.csv file. You can pass @activity('Lookup req csv').output.value as items in for each. Inside this you can pass the path, yearPartition and countryPartition values as @item().path , @item().country, @item().year from your databricks notebook activity.

  • Related