question

MohsenAkhavan avatar image
0 Votes"
MohsenAkhavan asked ShaikMaheer-MSFT edited

How to implement some python functions with Azure Data Factory activity?

Description scenario:
I have an Azure Service Bus and received data on a topic.
Also, I have an Azure function ( Service Bus Topic Trigger ) when received a message on Service Bus then this function runs some functions on this message. ( see the below code )
The steps of functions are (Details of this code)

  1. Received a message and convert it to JSON

  2. Check the received message is valid or not

  3. Check again a condition about the received message

  4. If the above condition is TRUE

  5. Create an array from the value of a filed in the received message

  6. Run feature extraction on the output of step 5

  7. Run normalization on the output of step 6

  8. Run classification on the output of step 7 and add labels into the received message

  9. The output of step 8, insert to database

Now, I want to know how can I implement and run these functions (steps) with Data Factory Activities as a pipeline. ( or other guide and suggestion about this scenario)

My code is:

 import logging
 import json
 import pickle
 import statistics
 import config
 import psycopg2
 import pandas as pd
 import numpy as np
 import azure.functions as func
    
    
 def main(message: func.ServiceBusMessage):
    
     connection_db = psycopg2.connect(
         f"host={config.database_url} dbname=developer user={config.database_username} password={config.database_password}")
     cursor_connection = connection_db.cursor()
    
     """
     this functions validate and filters data with the folloeing criteria:
     message_type==50
     logical_id=='BLOCK'
     """
     message_body = message.get_body().decode("utf-8")
     message_body = message_body.replace(";", ",")
     message_json = json.loads(message_body)
     print("Json Converted")
     if message_json['error'] == {} and message_json['MSG_TYPE_TAG'] != '':
         logging.info("Data is Valid")
     else:
         logging.info("Data Not Valid")
    
     if int(message_json['MSG_TYPE_TAG']) == 50 and message_json['GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG']['logical_name'] == 'BLOCK':
         message_filtered = message_json
    
         """
         this functions makes one array from the recieved array data
         """
    
         def _create_one_array(message_filtered):
             acceleration_array_of_all = []
             temp_array = message_filtered['GET_RIGIDSENSE_SENSOR_ACCELDATA_LOG_TAG']['acceleration_array']
             for value in temp_array:
                 acceleration_array_of_all.append(value)
             return acceleration_array_of_all
    
         """
         features extraction functions
         """
    
         def percent_above_mean(acceleration_array_list):
             percent_above_mean = 0
             mean = np.mean(acceleration_array_list)
             for i in acceleration_array_list:
                 if i > mean:
                     percent_above_mean += 1
             return percent_above_mean/len(acceleration_array_list)
    
         def variation_from_mean(acceleration_array_list):
             variation_from_mean = 0
             mean = np.mean(acceleration_array_list)
             for value in acceleration_array_list:
                 variation_from_mean = variation_from_mean+abs(value-mean)
             return variation_from_mean/len(acceleration_array_list)
    
         def _feature_extraction(acceleration_array):
             feature = dict()
             feature['mean'] = np.mean(acceleration_array)
             feature['max'] = max(acceleration_array)
             feature['min'] = min(acceleration_array)
             feature['std'] = np.std(acceleration_array)
             feature['median'] = statistics.median(acceleration_array)
             feature['L1'] = sum(list(map(abs, acceleration_array)))
             feature['MAD'] = pd.Series(acceleration_array).mad()
             feature['percent_above_mean'] = percent_above_mean(
                 acceleration_array)
             feature['variation_from_mean'] = variation_from_mean(
                 acceleration_array)
             features_dataframe = pd.DataFrame(feature, index=[0])
             return features_dataframe
    
         def _normalization(df):
             scaler = pickle.load(open('scaler.sav', 'rb'))
             scaler.transform(df)
             return df
    
         """
         classification function
         """
    
         def _classification_lable(normalized_features):
             classifier = pickle.load(
                 open('ExtraTreesClassifier.sav', 'rb'))
             prediction = dict()
             label = classifier.predict(normalized_features).tolist()[0]
             if label == 0:
                 prediction['label'] = 'Hard'
             else:
                 prediction['label'] = 'Easy'
             probablity = classifier.predict_proba(normalized_features)
             prediction['probability'] = round(max(probablity[0]), 2)
             return prediction
    
         def _classification(normalized_features):
             label = _classification_lable(normalized_features)
             return label
    
         acceleration_array = _create_one_array(message_filtered)
         extracted_features = _feature_extraction(acceleration_array)
         normalized_features = _normalization(extracted_features)
         label = _classification(normalized_features)
         logging.info('functions done')
    
         """
         Insert to database
         """
         message_final = {**message_filtered, **
                          message_filtered['LOG_TAG']}
         del message_final['error']
         del message_final['LOG_TAG']
         del message_final['acceleration_array']
    
         message_final['label'] = []
         message_final['probability'] = []
         message_final['label'] = label['label']
         message_final['probability'] = label['probability']
         cursor_connection.execute(
             '''INSERT into dci_output_lable VALUES (%(MSG_TYPE_TAG)s , %(ATTACHED_DEVICE_SERIAL_NUMBER_TAG)s, %(date_time)s , %(name)s , %(number)s , %(sequence)s , %(label)s , %(probability)s);''', message_final)
         connection_db.commit()
         logging.info("Insert to database done")
    
     else:
         logging.info(" Input data isn't BLOCKS")


azure-data-factoryazure-functionsazure-service-bus
· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Hi @MohsenAkhavan ,

Following up to check is below provided answer helps you? If yes, Please Accept Answer. Accepting answer helps community too. Please let us know if any further queries. Thank you.

0 Votes 0 ·

Hi @MohsenAkhavan ,

Just checking is below provided answer helps you? If yes, Please Accept Answer. Accepting answer helps community too. Please let us know if any further queries. Thank you.

0 Votes 0 ·

1 Answer

ShaikMaheer-MSFT avatar image
0 Votes"
ShaikMaheer-MSFT answered ShaikMaheer-MSFT commented

Hi @MohsenAkhavan ,


Thank you for posting your query in Microsoft Q&A Platform.

We can leverage "Azure Function" activity to run your Azure Function in Azure data factory pipeline.

The Azure Function activity allows you to run Azure Functions in a Data Factory pipeline. To run an Azure Function, you need to create a linked service connection and an activity that specifies the Azure Function that you plan to execute.

Please Note, The return type of the Azure function has to be a valid JObject.

Kindly go through below documentation to know how to use Azure Function activity and other details.
https://docs.microsoft.com/en-us/azure/data-factory/control-flow-azure-function-activity


Between, you already mentioned that Your Azure function is "Service Bus Topic" trigger, that means When ever you receive data on your topic then automatically your Azure Function may be running. So, I am unable to understand why you want to run it again using ADF pipeline? If you really want to run your same Azure function from ADF pipeline too explicitly then you can consider having HTTP trigger type on it and run it using Azure function activity or web activity.

Hope this will help. Thank you.


  • Please accept an answer if correct. Original posters help the community find answers faster by identifying the correct answer. Here is how.

  • Want a reminder to come back and check responses? Here is how to subscribe to a notification.


· 2
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

HI @ShaikMaheer-MSFT

Thanks for your response. for clarify, I have a service bus then received data every 5 sec. Now, I need a pipeline (like ADF pipeline) solution for receiving data from the service bus and run the above steps 1-9. Is it possible to do it with ADF activities? or do you have any suggestions?

Let me know if you need more information.

0 Votes 0 ·

Hi @MohsenAkhavan ,

Thank you for your response.

Azure Data Factory does not have a connector for Service bus. Hence its not possible to directly receive data from service bus.

However, you can plan to load your service bus data to blob storages and then leverage ADF event based triggers to run your pipeline.

You can also considering raising feedback item for this. Product team will actively monitor use feedbacks and consider them for future releases.

Similar kind of discussion you can found at below link as well.
https://docs.microsoft.com/en-us/answers/questions/111361/how-to-connect-service-bus-queue-to-azure-data-fac.html

Thank you. Please Accept Answer. Accepting answer will help community as well. Feel free to let us know if any further queries.

0 Votes 0 ·