How to process all input files using Azure Function blob_trigger with Python V2 and append data to an output blob

Maryna Paluyanava 191 Reputation points
2024-04-16T10:44:34.41+00:00

Hello,

 

Could anybody help me with my task?

The task is the following. I have a blob storage, where new files are loaded into the “input” folder.

I need to create Azure function (blob trigger) that takes input files one by one, processes them, and creates 2 dataframes for each input file. These 2 dataframes for all input files should be combined (appended) and as a result we should have 2 csv files in the output folder: “data.csv” and “details.csv”. How can we append data to a blob? How can duplicates be removed from that blob?

Could you please suggest how to implement this logic correctly?

My code is the following. It works in Storage Emulator if all files are loaded to the storage at the same time but overwrites the blob if a new file is added to the blob.

Could you please suggest a resource to read about Azure function with Python V2?

Thank you!!!

import logging
import azure.functions as func
import pandas as pd
import processing_functions as pf


app = func.FunctionApp()

df_data = pd.DataFrame(columns=['id', 'name', 'quantity', 'cost'])
df_details = pd.DataFrame(columns=['id', 'details'])

           
@app.blob_trigger(arg_name="inputblob",
                path="data/input/{name}",
                connection="AzureWebJobsStorage")
@app.blob_output(arg_name="outputdatablob",
                path="data/output/data.csv",
                connection="AzureWebJobsStorage")
@app.blob_output(arg_name="outputdetailsblob",
                path="data/output/details.csv",
                connection="AzureWebJobsStorage")
def main(inputblob: func.InputStream, outputdatablob: func.Out[bytes], outputdetailsblob: func.Out[bytes]):
    logging.info(f"Python blob trigger function processed blob \n"
                f"Name: {inputblob.name}\n")

    global df_data
    global df_details 

    df_input_delta = pd.read_csv(inputblob, encoding='utf-8')

    df_data_delta = pf.process_data(df_input_delta)
    df_details_delta = pf.process_details(df_input_delta)

    df_data = pd.concat(
        [
            df_data,
            df_data_delta,
        ], axis=0, ignore_index=True
    ).drop_duplicates()

    df_details = pd.concat(
        [
            df_details,
            df_details_delta,
        ], axis=0, ignore_index=True
    ).drop_duplicates()


    output_data_csv = df_data.to_csv(index=False)
    outputdatablob.set(output_data_csv.encode('utf-8'))

    output_details_csv = df_details.to_csv(index=False)
    outputdetailsblob.set(output_details_csv.encode('utf-8'))


Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,283 questions
0 comments No comments
{count} votes

Accepted answer
  1. Sina Salam 3,721 Reputation points
    2024-04-16T16:18:01.3+00:00

    Hello Maryna Paluyanava,

    Welcome to the Microsoft Q&A and thank you for posting your questions here.

    Problem

    Sequel to your questions, I understand that you are in need to create an Azure Function (blob trigger) that processes new files uploaded to the "input" folder in blob storage. The function should generate two dataframes (df_data and df_details) for each input file and combine them with previously processed data. The combined data should then be written to two CSV files (data.csv and details.csv) in the "output" folder. The function should handle duplicate data efficiently and maintain the state of processed data between function invocations.

    The challenges were break down into three major phases:

    • Overwriting Output Blobs
    • Duplicate Data Handling
    • State Management

    Scenarios

    In the scenarios according to your information we need to provide a solution to handle New File Upload, State Management, Efficient Duplicate Handling accordingly:

    • When a new file is uploaded to the "input" folder, the function should process it, append the data to existing dataframes, remove duplicates, and update the output CSV files accordingly.
    • The function should maintain the state of processed data between invocations to prevent data loss or duplication. This includes handling multiple function executions and maintaining the integrity of the output CSV files.
    • As the function processes large datasets, it should efficiently handle duplicate data to optimize processing time and resource utilization.

    Solution

    After careful reviewing of your code, it essentially defines an Azure Function that triggers on new blob uploads, processes the input CSV files, appends the processed data to global DataFrames, and writes the combined data to output CSV files in specified output blobs.

    However, to implement the logic correctly for your Azure function with Python V2 and ensuring that new files appended to the blob do not overwrite existing data and duplicates are removed, you can make a few modifications to your code to Maintain State and Handling Appending and Duplicates. The below is a modified version of your code:

    import logging
    import azure.functions as func
    import pandas as pd
    import processing_functions as pf
    def read_blob_as_df(blob: func.InputStream):
        try:
            df = pd.read_csv(blob, encoding='utf-8')
            return df
        except Exception as e:
            logging.error(f"Error reading blob: {e}")
            return None
    def write_df_to_blob(df, output_blob):
        try:
            csv_data = df.to_csv(index=False).encode('utf-8')
            output_blob.set(csv_data)
            return True
        except Exception as e:
            logging.error(f"Error writing dataframe to blob: {e}")
            return False
    def main(inputblob: func.InputStream, outputdatablob: func.Out[bytes], outputdetailsblob: func.Out[bytes]):
        logging.info(f"Python blob trigger function processed blob \n"
                    f"Name: {inputblob.name}\n")
        # Read existing data from output blobs
        try:
            df_data = read_blob_as_df(outputdatablob)
            df_details = read_blob_as_df(outputdetailsblob)
        except Exception as e:
            logging.error(f"Error reading existing data blobs: {e}")
            return
        # Process input blob
        df_input = read_blob_as_df(inputblob)
        if df_input is None:
            return
        # Process input data
        df_data_delta = pf.process_data(df_input)
        df_details_delta = pf.process_details(df_input)
        # Append new data and remove duplicates
        df_data = pd.concat([df_data, df_data_delta]).drop_duplicates()
        df_details = pd.concat([df_details, df_details_delta]).drop_duplicates()
        # Write back to output blobs
        if not write_df_to_blob(df_data, outputdatablob):
            return
        if not write_df_to_blob(df_details, outputdetailsblob):
            return
    

    Based on the above code, for Azure Blob Storage to maintain the state of processed dataframes. This involves reading the existing data from the output blobs, combining them with the new data, and then writing them back to the output blobs.

    Secondly, instead of overwriting the entire blob with each new file, read the existing data from the output blobs, append the new data, and then remove duplicates before writing back to the blobs.

    This modified code ensures that existing data is read from the output blobs, new data is appended, duplicates are removed, and the updated dataframes are written back to the output blobs without overwriting the existing content. Additionally, error handling is included to handle exceptions during reading and writing operations.

    Finally

    By following these steps provided above and utilizing the recommended resources, you can effectively address the identified problems and implement a robust solution for your Azure Function.

    • To efficiently maintain the state of processed data between function invocations, we can use Azure Blob Storage to store the current state of the dataframes (df_data and df_details).
    • To efficiently handle duplicate data in the combined dataframes, we can use Pandas' built-in functions to remove duplicates before writing the data to CSV files.
    • To ensure that output blobs are not overwritten and that new data is appended to existing output files, we can utilize Azure Blob Storage's append blob feature.

    References

    For further information on Azure Functions with Python V2 and Azure Blob Storage, you can refer to the additional resources by the right side of this page and the following resources:

    Azure Functions Python Developer Guide.

    Azure Blob Storage Documentation.

    Accept Answer

    I hope this is helpful! Do not hesitate to let me know if you have any other questions.

    Please remember to "Accept Answer" if answer helped, so that others in the community facing similar issues can easily find the solution.

    Best Regards,

    Sina Salam


0 additional answers

Sort by: Most helpful