question

cenh avatar image
0 Votes"
cenh asked MartinJaffer-MSFT commented

How to deal with long running Azure Functions in Azure Data Factory

I have a Function (HttpTrigger) that can take a bit to run 5-10 minutes. It reads some information from the body of the request to perform some actions:

 [FunctionName("myFunc")]
 public static async Task<IActionResult> Run(
             [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
             ILogger log)
         {
             log.LogInformation("C# HTTP trigger function processed a request.");
    
             string responseMessage = string.Empty;
    
             var content = await new StreamReader(req.Body).ReadToEndAsync();
             myClass body = JsonConvert.DeserializeObject<myClass>(content);

             for (int i = 0; i < 10; i++)
             {
                    anotherFunctionAsync(i, myClass).GetAwaiter().GetResult();
                    // Perform tasks totaling 5-10 minutes
             }
             return new OkObjectResult("everything went ok");
     }

However often I run into issues in my Data Factory where I will get an error like:

Operation on target your_function failed: Call to provided Azure function 'myFunc' failed with status-'BadGateway' and message 502 - Web server received an invalid response while acting as a gateway or proxy server.

I understand that this is due to the Function running for a long time.

How can I deal with this, such that Data Factory waits for the function to properly complete and not fail (when it is still running)?

I have already taken a look at the Durable Functions and Best Practices, but I don't know how to apply it when it has to communicate with Data Factory.

Since the function I call is async, could I just remove waiting for response?

azure-data-factoryazure-functions
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

MartinJaffer-MSFT avatar image
0 Votes"
MartinJaffer-MSFT answered MartinJaffer-MSFT commented

Hello @ChristianHansen-0520 and thank you for your question.

The general idea, is to have the function acknowledge receipt of the request from Data Factory and reply with an endpoint the Factory can check on later to get the status.

In the pipeline, the Function activity would be followed by an Until loop. The Until loop would contain a Web activity which queries the endpoint returned by the Function activity to get its status (success or fail or running). The loop condition would be set to repeat as long as the Web activity receives the "running" status from the endpoint. Break on success or fail response.

If you do not want to loop a web activity, there is another option. If you have the Function App write its success/fail to a file, then you can use the Validation activity to do the polling for you. The Validation activity can check for the presence of a file, doing retries for a set amount of time.

Another option might be to use a Webhook activity. In this case, Data Factory would pass an endpoint to the Function App and expect the Function App to make a separate call back in the allotted time.

Can you tell me where you see the option in Data Factory to "remove waiting for response?"



· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Sorry maybe I wasn't clear about that, in my function I call an Async method in which I wait for the method to return a result, I was wondering if I could simply not wait for that method to finish.

I can't find any good examples of how to do these solutions that you suggest, I don't think the documentation on Azure is sufficient. Care to explain how I can restructure my function above to solve the issue?

0 Votes 0 ·

@cenh thank you for the clarification. Looks like @NasreenAkter-2040 has provided a full example. Can you give that a look and let us know if it solves your issue? If so, please mark Nasreen's answer as accepted.

0 Votes 0 ·
nasreen-akter avatar image
3 Votes"
nasreen-akter answered

Hi @cenh,

Here is an example code for Durable Function and some screenshots how to call the Durable Function from the DataFactory.. Hope it will help. Thanks! :)

 namespace abc
 {
     public static class DF_Parse
     {
            
         [FunctionName("DF_Parse_HttpStart")]
         public static async Task<HttpResponseMessage> HttpStart([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
         {
             // Function input comes from the request content.
             string requestBody = await req.Content.ReadAsStringAsync();
             string instanceId = await starter.StartNewAsync("DF_Parse", null, requestBody);
    
             log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
    
             return starter.CreateCheckStatusResponse(req, instanceId);
         }
    
         [FunctionName("DF_Parse")]
         public static async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
         {
             log.LogInformation("C# HTTP trigger function processed a request.");
             try
             {
                 string sRequestBody = context.GetInput<string>();
                 List<Task<string>> contents = new List<Task<string>>();
                 myClass body = JsonConvert.DeserializeObject<myClass>(sRequestBody);
                 for (int i = 0; i < 10; i++)
                 {
                     JObject parameters = new JObject();
                     parameters.Add("iParam", i);
                     parameters.Add("myClass", JsonConvert.SerializeObject(myClass));
                     contents.Add(context.CallSubOrchestratorAsync<string>("DF_Parse_AnotherFunctionAsync", JsonConvert.SerializeObject(parameters)));
                 }
                 await Task.WhenAll(contents);
    
                 /* you can also check and work further with the response on each call if you want
                 //create a dictionary object from the responses
                 foreach (Task<string> eachTask in contents)
                 {
                     if (eachTask.IsCompletedSuccessfully)
                     {
                         record = ParseJsonString(eachTask.Result, log);
    
                         if (record.Count > 0)
                         {
                             totalRecords.Add(record);
                         }
                     }
                 }
                 */
             }
             catch (Exception ex)
             {
                 log.LogError(ex, "DF_Parse failed!");
                 throw ex;
             }
         }
    
          
         [FunctionName("DF_Parse_AnotherFunctionAsync")]
         public static async Task AnotherFunctionAsync([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log) 
         {
             try
             {
                 string input = context.GetInput<string>();
                 List<string> list = new List<string>();
    
                 list = await context.CallActivityAsync<List<string>>("DF_Parse_ActivityFunction", input);
                   
             }
             catch (Exception ex)
             {
                 log.LogInformation(ex.Message);
                 throw ex;
             }
         }
    
         [FunctionName("DF_Parse_ActivityFunction")]
         public static async Task<List<string>> ActivityFunction([ActivityTrigger] string input, ILogger log)
         {
             log.LogInformation($"ActivityFunction() started...");
    
             Dictionary<string, dynamic> parameters;
             List<string> outputList = new List<string>();
    
             try
             {
                 parameters = JsonConvert.DeserializeObject<Dictionary<string, dynamic>>(input);
                //function body
                 log.LogInformation("Successful!");
             }
             catch (Exception ex)
             {
                 log.LogInformation($"Failed with Error: {ex.Message}");
                 throw ex;
             }
    
             return outputList;
         }
     }
 }


 //host.json file
    
 {
   "version": "2.0",
   "extensions": {
     "durableTask": {
       "hubName": "ParseTaskQueue",
       "maxConcurrentActivityFunctions": 20,
       "maxConcurrentOrchestratorFunctions": 20,
       "extendedSessionsEnabled": false,
       "extendedSessionIdleTimeoutInSeconds": 30,
       "useGracefulShutdown": false
     }
   },
   "logging": {
     "logLevel": {
       "Host.Triggers.DurableTask": "Information"
     }
   }
 }

40475-durableazurefunction.jpg40476-webactivity.jpg40463-checkstatus.jpg





webactivity.jpg (46.5 KiB)
checkstatus.jpg (47.7 KiB)
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.