Home > Mobile >  PySpark UDF Returns Status Code and Response in Separate withColumn
PySpark UDF Returns Status Code and Response in Separate withColumn

Time:10-28

I have a udf that can reference the status_code and returns the body.

def Api(a):
  path = endpoint
  headers = {'sample-Key': sample}
  body = [{'text': body }]
  res = None
  try:
      req = requests.post(path, params=params, headers=headers, json=body)
      req = req.json()
      dumps=json.dumps(req)
  except Exception as e:
    return e
  if res != None and req.status_code == 200:
    return json.loads(dumps)
  return None

udf_Api = udf(Api)
newDF=df.withColumn("output", udf_Api(col("input")))

I can return the json.loads and get it into the dataframe. However, my problem is that I need to also persist the status_code in a separate column. So the output would look like:

 --------- ----------- ---------- 
|    input|status_code|    output|
 --------- ----------- ---------- 
|inputText|        200|outputText|
 --------- ----------- ---------- 

So how can I return both the req.status_code and the json.loads(), but put them into separate columns in the dataframe? I thought of returning an array and then splitting it, but not sure how to do that.

CodePudding user response:

You can modify your UDF to return a dict instead of string or integer, then define the schema for output.

from pyspark.sql import functions as F
from pyspark.sql import types as T

def Api(a):
    return {
        'status': 200,
        'data': '{"a": 1}'
    }

schema = T.StructType([
    T.StructField('status', T.IntegerType()),
    T.StructField('data', T.StringType())
])

(df
    .withColumn('output', F.udf(Api, schema)('col'))
    .select('col', 'output.*')
    .show()
)

#  --- ------ -------- 
# |col|status|    data|
#  --- ------ -------- 
# | 10|   200|{"a": 1}|
# | 20|   200|{"a": 1}|
# | 30|   200|{"a": 1}|
#  --- ------ -------- 
  • Related