I have a Main pipeline in Synapse workspace which has 2 activities:
1st - Notebook activity
2nd - If Condition activity
For the 1st one (Synapse notebook, spark pool, pyspark), I have a SQL cell like the following:
It has a simple query using a join:
%%sql
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id
This will return some rows to me (< 50 rows)
Now I want to access this result set of less than 50 rows in the "If Condition" activity in Synapse Pipeline,
How do I perform this?
According to docs, I should be able to use the following:
@activity(‘Notebook1’).output.status.Output.result.exitValue
But the exitValue that I'm getting in the synapse notebook output is null. How do I access this result set in "If Condition" then?
CodePudding user response:
You have to return the value from notebook using
mssparkutils.notebook.exit
to access it from your pipeline using@activity(‘Notebook1’).output.status.Output.result.exitValue
.Instead of using an SQL cell, you can use
spark.sql
and store the result in a dataframe usingdf = spark.sql(Query)
.You can either choose to return the entire dataframe data to pipeline, or just the number of records using
dataframe.count()
(if you want to verify the count of records).
mssparkutils.notebook.exit(str(df.count())) #where df is the dataframe
- If you want to return the entire data as array of objects so that the data can be used to iterate through and for other operations in the pipeline, you can use the following code:
x = df.toPandas()
json = x.to_json(orient = 'records' )
mssparkutils.notebook.exit(json)
- When I run the pipeline with notebook activity and store it in set variable to show the output, it would give the following result:
@activity('Notebook1').output.status.Output.result.exitValue
- You can convert this array of objects from string (since it is returned as string) to an actual object using
@json
method.
CodePudding user response:
So 1 way I found is to explicitly attach an output to the exitValue of the Notebook after creating a Dataframe by doing the following:
Following are the 4 cells that I have in "Notebook1" :
%%sql
CREATE OR REPLACE A TEMP VIEW Result AS
SELECT A.name FROM A
LEFT JOIN B ON A.id = B.id
#Converting the result set to a dataframe
Query = "SELECT * FROM Result"
dfResult = spark.sql(Query)
#Converting the dataframe column that I want to use further to a List
resultVariable = dfResult.select(name).rdd.flatMap(lambda x:x).collect()
Refer this to convert a dataframe column to list
#Attaching the List to exitValue in Output
mssparkutils.notebook.exit(resultVariable)
Then, in the Expression of the "If Condition" activity we can use the following to access this result:
activity(Notebook1).output.status.Output.result.exitValue
So, in this way inside the Expression now you will be able to access the List that you have passed earlier in the exitValue
This is one approach that I found. Would be happy to know if there is an easier way to do this.