Home > database >  Azure function in ADF pipeline using a Python script
Azure function in ADF pipeline using a Python script

Time:11-13

I am trying to run my below script in the Azure Data Factory in a pipeline. My Python code retrieves 2 CSV files from the Blob storage and merges them into one file based on a key and uploads it to the data lake storage. I have tried with function app block which gives me InternalServerError and I also tried Web activity which runs without error. The problem is that the file is not created when I run the pipeline, even though the pipeline runs successfully(with Web block). Locally the function also runs when I call the main function and the file gets created in the data lake storage. I have tried http trigger and durable functions as well in VS Code, but none of them created the “merged.csv” file in Azure.

My Python script(init.py):

import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
    STORAGEACCOUNTKEY= '****'
    LOCALFILENAME= ['file1.csv', 'file2.csv']
    CONTAINERNAME= 'inputblob'

    file1 = pd.DataFrame()
    file2 = pd.DataFrame()
    #download from blob

    blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)

    for i in LOCALFILENAME:
        with open(i, "wb") as my_blobs:
            blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
            blob_data = blob_client_instance.download_blob()
            blob_data.readinto(my_blobs)
            if i == 'file1.csv':
                file1 = pd.read_csv(i)
            if i == 'file2.csv':
                file2 = pd.read_csv(i)
    
    # load

  
    summary = pd.merge(left=file1, right=file2, on='key', how='inner')
        
    summary.to_csv()

    global service_client
            
    service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential='****')
        
    file_system_client = service_client.get_file_system_client(file_system="outputdatalake")

    directory_client = file_system_client.get_directory_client("functionapp") 

    file_client = directory_client.create_file("merged.csv") 

    file_contents = summary.to_csv()

    file_client.upload_data(file_contents, overwrite=True) 

    return("This HTTP triggered function executed successfully.")

My JSON file(function.json):

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

CodePudding user response:

Your code looks good, and it is working fine in another environment. So Internal Server Errors can be caused for below reasons in Azure functions:

  1. Make sure you add all the values from Local.Settings.json file to Application settings (FunctionApp -> Configuration -> Application Settings)

enter image description here

  1. Check for CORS. Try adding “*” (Any request made against a storage resource when CORS is enabled must either have a valid authorization header or must be made against a public resource.)

enter image description here

CodePudding user response:

There are 2 reasons I can think of which may be the cause of your issue.

A - Check your requirements.txt. All your python libraries should be present there. It should looks like this.

azure-functions
pandas==1.3.4
azure-storage-blob==12.9.0
azure-storage-file-datalake==12.5.0

B - Next, it looks like you are writing files into the Functions worker memory. This is not allowed and totally unnecessary. This would explain why it is working in your local machine but not in Azure. You can achieve what you want without doing that. See the below section of code which should serve your purpose. There are mild changes to how we do the load of csv from the blob to dataframe.

import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func
from io import StringIO

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
    STORAGEACCOUNTKEY= 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    LOCALFILENAME= ['file1.csv', 'file2.csv']
    CONTAINERNAME= 'inputblob'

    file1 = pd.DataFrame()
    file2 = pd.DataFrame()
    #download from blob

    blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
    for i in LOCALFILENAME:
            blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
            blob_data = blob_client_instance.download_blob()
            if i == 'file1.csv':
                file1 = pd.read_csv(StringIO(blob_data.content_as_text()))
            if i == 'file2.csv':
                file2 = pd.read_csv(StringIO(blob_data.content_as_text()))

    
    # load
    summary = pd.merge(left=file1, right=file2, on='key', how='inner')
    summary.to_csv()

    service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential=STORAGEACCOUNTKEY)
    file_system_client = service_client.get_file_system_client(file_system="outputdatalake")
    directory_client = file_system_client.get_directory_client("my-directory") 
    file_client = directory_client.create_file("merged.csv") 
    file_contents = summary.to_csv()
    file_client.upload_data(file_contents, overwrite=True) 

    return("This HTTP triggered function executed successfully.")
  • Related