Home > Blockchain >  How to pass result of Sql query from Azure Synapse notebook to next activity in Synapse Pipeline?
How to pass result of Sql query from Azure Synapse notebook to next activity in Synapse Pipeline?

Time:12-12

I have a Main pipeline in Synapse workspace which has 2 activities:

1st - Notebook activity

2nd - If Condition activity

For the 1st one (Synapse notebook, spark pool, pyspark), I have a SQL cell like the following:

It has a simple query using a join:

%%sql
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id

This will return some rows to me (< 50 rows)

Now I want to access this result set of less than 50 rows in the "If Condition" activity in Synapse Pipeline,

How do I perform this?

According to docs, I should be able to use the following:

@activity(‘Notebook1’).output.status.Output.result.exitValue

But the exitValue that I'm getting in the synapse notebook output is null. How do I access this result set in "If Condition" then?

CodePudding user response:

  • You have to return the value from notebook using mssparkutils.notebook.exit to access it from your pipeline using @activity(‘Notebook1’).output.status.Output.result.exitValue .

  • Instead of using an SQL cell, you can use spark.sql and store the result in a dataframe using df = spark.sql(Query).

  • You can either choose to return the entire dataframe data to pipeline, or just the number of records using dataframe.count() (if you want to verify the count of records).

mssparkutils.notebook.exit(str(df.count()))  #where df is the dataframe 
  • If you want to return the entire data as array of objects so that the data can be used to iterate through and for other operations in the pipeline, you can use the following code:
x = df.toPandas()
json = x.to_json(orient = 'records' )
mssparkutils.notebook.exit(json)

enter image description here

  • When I run the pipeline with notebook activity and store it in set variable to show the output, it would give the following result:
@activity('Notebook1').output.status.Output.result.exitValue

enter image description here

  • You can convert this array of objects from string (since it is returned as string) to an actual object using @json method.

CodePudding user response:

So 1 way I found is to explicitly attach an output to the exitValue of the Notebook after creating a Dataframe by doing the following:

Following are the 4 cells that I have in "Notebook1" :

%%sql
CREATE OR REPLACE A TEMP VIEW Result AS
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id
#Converting the result set to a dataframe 
Query = "SELECT * FROM Result"
dfResult = spark.sql(Query)
#Converting the dataframe column that I want to use further to a List 
resultVariable = dfResult.select(name).rdd.flatMap(lambda x:x).collect()

Refer this to convert a dataframe column to list

#Attaching the List to exitValue in Output
mssparkutils.notebook.exit(resultVariable)

Then, in the Expression of the "If Condition" activity we can use the following to access this result:

activity(Notebook1).output.status.Output.result.exitValue

So, in this way inside the Expression now you will be able to access the List that you have passed earlier in the exitValue

This is one approach that I found. Would be happy to know if there is an easier way to do this.

  • Related