How to transform an Azure Function into Azure durable Function in Python?

11-4688 61 Reputation points
2023-05-30T07:03:57.77+00:00

I'm stuck with transforming an Azure HTTP triggered function into something more robust that can take more than 230 seconds.

I struggle with dividing the code into functions, not sure how to construct the activity, orchestrator and client function in my case. I would really appreciate some help here.

My code for init.py in http triggered function is below; I use the google_search.py module to extract the results of the Search into Pandas Dataframe.


import azure.functions as func
from googleapiclient import discovery
import pandas as pd
from tqdm import tqdm
import json
from google_search import search
import os


def main(req: func.HttpRequest) -> func.HttpResponse:
    try:
        # Access sensitive credentials
        api_key = os.getenv('apikey')
        cse_id = os.getenv('cseid')

        req_body = req.get_json()
        search_terms = req_body.get('search_terms')
        num_results = int(req_body.get('num_results', 5))
        country_code = req_body.get('country_code', 'uk')

        # Validate the required parameters
        if not search_terms or not api_key or not cse_id:
            return func.HttpResponse('Missing required parameters', status_code=400)

        # Call the search function with the provided arguments
        df = search(search_terms=search_terms,
                    num_results=num_results,
                    country_code=country_code,
                    api_key=api_key,
                    cse_id=cse_id)

        # Convert DataFrame to a list of dictionaries
        response_data = df.to_dict(orient='records')

        # Serialize the response data to JSON
        response_json = json.dumps(response_data)

        # Return the response as a JSON array
        return func.HttpResponse(response_json, mimetype='application/json')
    except Exception as e:
        return func.HttpResponse(f'An error occurred: {str(e)}', status_code=500)
Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,285 questions
{count} votes

1 answer

Sort by: Most helpful
  1. MayankBargali-MSFT 68,641 Reputation points
    2023-05-31T10:25:07.02+00:00

    @11-4688 Thanks for reaching out.

    Please refer to orchestrator function and activity function document for more details.

    Create an orchestrator function that will handle the workflow. The orchestrator function will call the search function and wait for the results to be returned. If the search function takes longer than the default timeout (which is 10 minutes), the orchestrator function will automatically checkpoint its state and return a response indicating that the operation is still in progress.

    import azure.functions as func
    import json
    import os
    from typing import List
    from google_search import search
    from azure.durable_functions import (
        durable_task,
        TaskOrchestrationContext,
        TaskActivityContext
    )
    
    @durable_task
    def search_orchestrator(context: TaskOrchestrationContext, search_terms: str, num_results: int, country_code: str) -> List[dict]:
        api_key = os.getenv('apikey')
        cse_id = os.getenv('cseid')
    
        # Call the search function with the provided arguments
        df = yield context.call_activity('search_activity', search_terms, num_results, country_code, api_key, cse_id)
    
        # Convert DataFrame to a list of dictionaries
        response_data = df.to_dict(orient='records')
    
        # Serialize the response data to JSON
        response_json = json.dumps(response_data)
    
        # Return the response as a JSON array
        return response_json
    
    

    Create an activity function that will perform the search operation. The activity function will be called by the orchestrator function and will perform the search operation. If the search operation takes longer than the default timeout, the activity function will automatically checkpoint its state and return a response indicating that the operation is still in progress.

    @durable_task
    def search_activity(context: TaskActivityContext, search_terms: str, num_results: int, country_code: str, api_key: str, cse_id: str) -> pd.DataFrame:
        # Call the search function with the provided arguments
        df = search(search_terms=search_terms,
     num_results, country_code=country_code, api_key=api_key, cse_id=cse_id)
    # Return the search results as a Pandas DataFrame
    return df
    

    Now Create a client function that will trigger the orchestrator function. The client function will be called by the HTTP trigger and will start the search workflow by calling the orchestrator function.

    Note: The above is only for reference and please test and modify as per your input as this is based on your existing function. I couldn't test with the input value as per you are passing.